diff --git a/pom.xml b/pom.xml
index e3add9a..052ee97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,7 @@
5.8.2
3.21.0
+ 4.11.0
@@ -218,6 +219,12 @@
${fabric8.version}
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java b/src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
index 49287df..3d7feee 100644
--- a/src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
+++ b/src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
@@ -15,30 +15,18 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
-import io.strimzi.api.kafka.model.KafkaUserAuthentication;
-import io.strimzi.api.kafka.model.KafkaUserSpec;
-import io.strimzi.api.kafka.model.status.KafkaUserStatus;
-import io.strimzi.kafka.access.internal.KafkaListener;
-import io.strimzi.kafka.access.internal.KafkaAccessParser;
-import io.strimzi.kafka.access.internal.KafkaUserData;
+import io.strimzi.kafka.access.internal.KafkaAccessMapper;
import io.strimzi.kafka.access.model.BindingStatus;
import io.strimzi.kafka.access.model.KafkaAccess;
-import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaAccessStatus;
-import io.strimzi.kafka.access.model.KafkaReference;
-import io.strimzi.kafka.access.model.KafkaUserReference;
-import io.strimzi.kafka.access.internal.KafkaParser;
-import io.strimzi.kafka.access.internal.CustomResourceParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -50,28 +38,25 @@
@ControllerConfiguration
public class KafkaAccessReconciler implements Reconciler, EventSourceInitializer {
- private static final String TYPE_SECRET_KEY = "type";
- private static final String TYPE_SECRET_VALUE = "kafka";
- private static final String PROVIDER_SECRET_KEY = "provider";
- private static final String PROVIDER_SECRET_VALUE = "strimzi";
- private static final String SECRET_TYPE = "servicebinding.io/kafka";
- private static final String ACCESS_SECRET_EVENT_SOURCE = "access-secret-event-source";
-
private final KubernetesClient kubernetesClient;
- private final Map commonSecretData = new HashMap<>();
+ private InformerEventSource kafkaAccessSecretEventSource;
+ private final SecretDependentResource secretDependentResource;
private final Map commonSecretLabels = new HashMap<>();
-
+ private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessOperator.class);
+ /**
+ * Name of the event source for KafkaUser Secret resources
+ */
+ public static final String KAFKA_USER_SECRET_EVENT_SOURCE = "KAFKA_USER_SECRET_EVENT_SOURCE";
+
/**
* @param kubernetesClient The Kubernetes client
*/
public KafkaAccessReconciler(final KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
- final Base64.Encoder encoder = Base64.getEncoder();
- commonSecretData.put(TYPE_SECRET_KEY, encoder.encodeToString(TYPE_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
- commonSecretData.put(PROVIDER_SECRET_KEY, encoder.encodeToString(PROVIDER_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
- commonSecretLabels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
+ secretDependentResource = new SecretDependentResource();
+ commonSecretLabels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE);
}
/**
@@ -88,8 +73,7 @@ public UpdateControl reconcile(final KafkaAccess kafkaAccess, final
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
LOGGER.info("Reconciling KafkaAccess {}/{}", kafkaAccessNamespace, kafkaAccessName);
- final Map data = secretData(kafkaAccess.getSpec(), kafkaAccessNamespace);
- createOrUpdateSecret(context, data, kafkaAccess);
+ createOrUpdateSecret(secretDependentResource.desired(kafkaAccess.getSpec(), kafkaAccessNamespace, context), kafkaAccess);
final boolean bindingStatusCorrect = Optional.ofNullable(kafkaAccess.getStatus())
.map(KafkaAccessStatus::getBinding)
@@ -106,52 +90,13 @@ public UpdateControl reconcile(final KafkaAccess kafkaAccess, final
}
}
- protected Map secretData(final KafkaAccessSpec spec, final String kafkaAccessNamespace) {
- final KafkaReference kafkaReference = spec.getKafka();
- final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(kafkaAccessNamespace);
- final Optional kafkaUserReference = Optional.ofNullable(spec.getUser());
-
- final Kafka kafka = getKafka(kafkaReference.getName(), kafkaClusterNamespace);
-
- final Map data = new HashMap<>(commonSecretData);
- final KafkaListener listener;
- try {
- if (kafkaUserReference.isPresent()) {
- if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
- throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
- }
- final String kafkaUserName = kafkaUserReference.get().getName();
- final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(kafkaAccessNamespace);
- final KafkaUser kafkaUser = getKafkaUser(kafkaUserName, kafkaUserNamespace);
- final String kafkaUserType = Optional.ofNullable(kafkaUser)
- .map(KafkaUser::getSpec)
- .map(KafkaUserSpec::getAuthentication)
- .map(KafkaUserAuthentication::getType)
- .orElse(KafkaParser.USER_AUTH_UNDEFINED);
- listener = KafkaParser.getKafkaListener(kafka, spec, kafkaUserType);
- if (kafkaUser != null) {
- final KafkaUserData userData = Optional.ofNullable(kafkaUser.getStatus())
- .map(KafkaUserStatus::getSecret)
- .map(secretName -> kubernetesClient.secrets().inNamespace(kafkaUserNamespace).withName(secretName).get())
- .map(secret -> new KafkaUserData(kafkaUser).withSecret(secret))
- .orElse(new KafkaUserData(kafkaUser));
- data.putAll(userData.getConnectionSecretData());
- }
-
- } else {
- listener = KafkaParser.getKafkaListener(kafka, spec);
- }
- } catch (CustomResourceParseException e) {
- throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
- }
- data.putAll(listener.getConnectionSecretData());
- return data;
- }
-
- private void createOrUpdateSecret(final Context context, final Map data, final KafkaAccess kafkaAccess) {
+ private void createOrUpdateSecret(final Map data, final KafkaAccess kafkaAccess) {
final String kafkaAccessName = kafkaAccess.getMetadata().getName();
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
- context.getSecondaryResource(Secret.class, ACCESS_SECRET_EVENT_SOURCE)
+ if (kafkaAccessSecretEventSource == null) {
+ throw new IllegalStateException("Event source for Kafka Access Secret not initialized, cannot reconcile");
+ }
+ kafkaAccessSecretEventSource.get(new ResourceID(kafkaAccessName, kafkaAccessNamespace))
.ifPresentOrElse(secret -> {
final Map currentData = secret.getData();
if (!data.equals(currentData)) {
@@ -199,50 +144,42 @@ public Map prepareEventSources(final EventSourceContext kafkaEventSource = new InformerEventSource<>(
InformerConfiguration.from(Kafka.class, context)
- .withSecondaryToPrimaryMapper(kafka -> KafkaAccessParser.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
+ .withSecondaryToPrimaryMapper(kafka -> KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
+ .withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource kafkaUserEventSource = new InformerEventSource<>(
InformerConfiguration.from(KafkaUser.class, context)
- .withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
- .withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
+ .withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
+ .withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource strimziSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
- .withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_CLUSTER_LABEL_VALUE))
- .withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
+ .withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE))
+ .withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource strimziKafkaUserSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
- .withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_USER_LABEL_VALUE))
- .withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
+ .withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE))
+ .withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
- InformerEventSource kafkaAccessSecretEventSource = new InformerEventSource<>(
+ kafkaAccessSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
- .withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE))
- .withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
+ .withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE))
+ .withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
- Map eventSources = EventSourceInitializer.nameEventSources(kafkaEventSource, kafkaUserEventSource, strimziSecretEventSource, strimziKafkaUserSecretEventSource);
- eventSources.put(ACCESS_SECRET_EVENT_SOURCE, kafkaAccessSecretEventSource);
+ Map eventSources = EventSourceInitializer.nameEventSources(
+ kafkaEventSource,
+ kafkaUserEventSource,
+ strimziSecretEventSource,
+ kafkaAccessSecretEventSource
+ );
+ eventSources.put(KAFKA_USER_SECRET_EVENT_SOURCE, strimziKafkaUserSecretEventSource);
LOGGER.info("Finished preparing event sources");
return eventSources;
}
-
- private Kafka getKafka(final String clusterName, final String namespace) {
- return Crds.kafkaOperation(kubernetesClient)
- .inNamespace(namespace)
- .withName(clusterName)
- .get();
- }
-
- private KafkaUser getKafkaUser(final String name, final String namespace) {
- return Crds.kafkaUserOperation(kubernetesClient)
- .inNamespace(namespace)
- .withName(name)
- .get();
- }
}
diff --git a/src/main/java/io/strimzi/kafka/access/SecretDependentResource.java b/src/main/java/io/strimzi/kafka/access/SecretDependentResource.java
new file mode 100644
index 0000000..fd453e1
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/access/SecretDependentResource.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.access;
+
+import io.fabric8.kubernetes.api.model.Secret;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.strimzi.api.kafka.model.Kafka;
+import io.strimzi.api.kafka.model.KafkaUser;
+import io.strimzi.api.kafka.model.KafkaUserAuthentication;
+import io.strimzi.api.kafka.model.KafkaUserSpec;
+import io.strimzi.api.kafka.model.status.KafkaUserStatus;
+import io.strimzi.kafka.access.internal.CustomResourceParseException;
+import io.strimzi.kafka.access.internal.KafkaListener;
+import io.strimzi.kafka.access.internal.KafkaParser;
+import io.strimzi.kafka.access.internal.KafkaUserData;
+import io.strimzi.kafka.access.model.KafkaAccess;
+import io.strimzi.kafka.access.model.KafkaAccessSpec;
+import io.strimzi.kafka.access.model.KafkaReference;
+import io.strimzi.kafka.access.model.KafkaUserReference;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * Class to represent the data in the Secret created by the operator
+ * In future updates this could be updated to implement the
+ * Java Operator SDK DependentResource class
+ */
+public class SecretDependentResource {
+
+ private static final String TYPE_SECRET_KEY = "type";
+ private static final String TYPE_SECRET_VALUE = "kafka";
+ private static final String PROVIDER_SECRET_KEY = "provider";
+ private static final String PROVIDER_SECRET_VALUE = "strimzi";
+ private final Map commonSecretData = new HashMap<>();
+
+ /**
+ * Default constructor that initialises the common secret data
+ */
+ public SecretDependentResource() {
+ final Base64.Encoder encoder = Base64.getEncoder();
+ commonSecretData.put(TYPE_SECRET_KEY, encoder.encodeToString(TYPE_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
+ commonSecretData.put(PROVIDER_SECRET_KEY, encoder.encodeToString(PROVIDER_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /**
+ * The desired state of the data in the secret
+ * @param spec The spec of the KafkaAccess resource being reconciled
+ * @param namespace The namespace of the KafkaAccess resource being reconciled
+ * @param context The event source context
+ * @return The data for the Secret as a Map
+ */
+ public Map desired(final KafkaAccessSpec spec, final String namespace, final Context context) {
+ final KafkaReference kafkaReference = spec.getKafka();
+ final String kafkaClusterName = kafkaReference.getName();
+ final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(namespace);
+ final Kafka kafka = context.getSecondaryResource(Kafka.class).orElseThrow(illegalStateException("Kafka", kafkaClusterNamespace, kafkaClusterName));
+ final Map data = new HashMap<>(commonSecretData);
+ final Optional kafkaUserReference = Optional.ofNullable(spec.getUser());
+ if (kafkaUserReference.isPresent()) {
+ if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
+ throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
+ }
+ final String kafkaUserName = kafkaUserReference.get().getName();
+ final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(namespace);
+ final KafkaUser kafkaUser = context.getSecondaryResource(KafkaUser.class).orElseThrow(illegalStateException("KafkaUser", kafkaUserNamespace, kafkaUserName));
+ final String userSecretName = Optional.ofNullable(kafkaUser.getStatus())
+ .map(KafkaUserStatus::getSecret)
+ .orElseThrow(illegalStateException("Secret in KafkaUser status", kafkaUserNamespace, kafkaUserName));
+ final InformerEventSource kafkaUserSecretEventSource = (InformerEventSource) context.eventSourceRetriever()
+ .getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
+ final Secret kafkaUserSecret = kafkaUserSecretEventSource.get(new ResourceID(userSecretName, kafkaUserNamespace))
+ .orElseThrow(illegalStateException(String.format("Secret %s for KafkaUser", userSecretName), kafkaUserNamespace, kafkaUserName));
+ data.putAll(secretDataWithUser(spec, kafka, kafkaUser, new KafkaUserData(kafkaUser).withSecret(kafkaUserSecret)));
+ } else {
+ data.putAll(secretData(spec, kafka));
+ }
+ return data;
+ }
+
+ protected Map secretData(final KafkaAccessSpec spec, final Kafka kafka) {
+ final Map data = new HashMap<>(commonSecretData);
+ final KafkaListener listener;
+ try {
+ listener = KafkaParser.getKafkaListener(kafka, spec);
+ } catch (CustomResourceParseException e) {
+ throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
+ }
+ data.putAll(listener.getConnectionSecretData());
+ return data;
+ }
+
+ protected Map secretDataWithUser(final KafkaAccessSpec spec, final Kafka kafka, final KafkaUser kafkaUser, final KafkaUserData kafkaUserData) {
+ final Map data = new HashMap<>(commonSecretData);
+ final KafkaListener listener;
+ try {
+ final String kafkaUserType = Optional.ofNullable(kafkaUser.getSpec())
+ .map(KafkaUserSpec::getAuthentication)
+ .map(KafkaUserAuthentication::getType)
+ .orElse(KafkaParser.USER_AUTH_UNDEFINED);
+ listener = KafkaParser.getKafkaListener(kafka, spec, kafkaUserType);
+ data.putAll(kafkaUserData.getConnectionSecretData());
+ } catch (CustomResourceParseException e) {
+ throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
+ }
+ data.putAll(listener.getConnectionSecretData());
+ return data;
+ }
+
+ private static Supplier illegalStateException(String type, String namespace, String name) {
+ return () -> new IllegalStateException(String.format("%s %s/%s missing", type, namespace, name));
+ }
+}
diff --git a/src/main/java/io/strimzi/kafka/access/internal/KafkaAccessParser.java b/src/main/java/io/strimzi/kafka/access/internal/KafkaAccessMapper.java
similarity index 88%
rename from src/main/java/io/strimzi/kafka/access/internal/KafkaAccessParser.java
rename to src/main/java/io/strimzi/kafka/access/internal/KafkaAccessMapper.java
index 2e6360a..8d8c4df 100644
--- a/src/main/java/io/strimzi/kafka/access/internal/KafkaAccessParser.java
+++ b/src/main/java/io/strimzi/kafka/access/internal/KafkaAccessMapper.java
@@ -29,9 +29,9 @@
import java.util.stream.Stream;
/**
- * Representation of a KafkaAccess parser that collects the resource IDs from different sources
+ * Maps Strimzi and Kuberentes resources to and from KafkaAccess resources
*/
-public class KafkaAccessParser {
+public class KafkaAccessMapper {
/**
* The constant for managed-by label
@@ -63,7 +63,7 @@ public class KafkaAccessParser {
*/
public static final String KAFKA_ACCESS_LABEL_VALUE = "kafka-access-operator";
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessParser.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessMapper.class);
/**
* Filters the stream of KafkaAccess objects to find only the ones that reference the provided Kafka resource.
@@ -186,7 +186,7 @@ public static Set secretSecondaryToPrimaryMapper(final Stream kafkaUserPrimaryToSecondaryMapper(final KafkaAcces
});
return resourceIDS;
}
+
+ /**
+ * Finds the Kafka that is referenced by this KafkaAccess object.
+ *
+ * @param kafkaAccess KafkaAccess object to parse
+ *
+ * @return Set of ResourceIDs containing the Kafka that is referenced by the KafkaAccess
+ */
+ public static Set kafkaPrimaryToSecondaryMapper(final KafkaAccess kafkaAccess) {
+ final Set resourceIDS = new HashSet<>();
+ Optional.ofNullable(kafkaAccess.getSpec())
+ .map(KafkaAccessSpec::getKafka)
+ .ifPresent(kafkaReference -> {
+ String name = kafkaReference.getName();
+ String namespace = Optional.ofNullable(kafkaReference.getNamespace())
+ .orElseGet(() -> Optional.ofNullable(kafkaAccess.getMetadata()).map(ObjectMeta::getNamespace).orElse(null));
+ if (name == null || namespace == null) {
+ LOGGER.error("Found Kafka for KafkaAccess instance, but metadata is missing.");
+ } else {
+ resourceIDS.add(new ResourceID(name, namespace));
+ }
+ });
+ return resourceIDS;
+ }
}
diff --git a/src/test/java/io/strimzi/kafka/access/KafkaAccessReconcilerTest.java b/src/test/java/io/strimzi/kafka/access/KafkaAccessReconcilerTest.java
index 2d91b7e..bd309ea 100644
--- a/src/test/java/io/strimzi/kafka/access/KafkaAccessReconcilerTest.java
+++ b/src/test/java/io/strimzi/kafka/access/KafkaAccessReconcilerTest.java
@@ -9,7 +9,6 @@
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Secret;
-import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.Operator;
@@ -59,7 +58,7 @@ public class KafkaAccessReconcilerTest {
private static final String KAFKA_USER_NAME = "my-kafka-user";
private static final int BOOTSTRAP_PORT_9092 = 9092;
private static final int BOOTSTRAP_PORT_9093 = 9093;
- private static final long TEST_TIMEOUT = 500;
+ private static final long TEST_TIMEOUT = 1000;
KubernetesClient client;
Operator operator;
@@ -148,11 +147,14 @@ void testReconcileWithKafkaUser() {
);
Crds.kafkaOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafka).create();
- final KafkaUser kafkaUser = ResourceProvider.getKafkaUser(KAFKA_NAME, KAFKA_NAMESPACE, new KafkaUserTlsExternalClientAuthentication());
+ final KafkaUser kafkaUser = ResourceProvider.getKafkaUserWithStatus(KAFKA_USER_NAME, KAFKA_NAMESPACE, KAFKA_USER_NAME, "my-user", new KafkaUserTlsExternalClientAuthentication());
Crds.kafkaUserOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafkaUser).create();
+ final Secret kafkaUserSecret = ResourceProvider.getStrimziUserSecret(KAFKA_USER_NAME, KAFKA_NAMESPACE, KAFKA_NAME);
+ client.secrets().inNamespace(KAFKA_NAMESPACE).resource(kafkaUserSecret).create();
+
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
- final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_NAME, KAFKA_NAMESPACE);
+ final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME, KAFKA_NAMESPACE);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference, kafkaUserReference);
client.resources(KafkaAccess.class).resource(kafkaAccess).create();
@@ -205,10 +207,9 @@ void testReconcileWithSASLKafkaUser() {
final Map userSecretData = new HashMap<>();
userSecretData.put("password", encodedPassword);
userSecretData.put(SaslConfigs.SASL_JAAS_CONFIG, encodedSaslJaasConfig);
- final Secret userSecret = new SecretBuilder().withNewMetadata().withName(KAFKA_USER_NAME).withNamespace(KAFKA_NAMESPACE).endMetadata()
- .withData(userSecretData)
- .build();
- client.secrets().inNamespace(KAFKA_NAMESPACE).resource(userSecret).create();
+ final Secret kafkaUserSecret = ResourceProvider.getStrimziUserSecret(KAFKA_USER_NAME, KAFKA_NAMESPACE, KAFKA_NAME);
+ kafkaUserSecret.setData(userSecretData);
+ client.secrets().inNamespace(KAFKA_NAMESPACE).resource(kafkaUserSecret).create();
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME, KAFKA_NAMESPACE);
diff --git a/src/test/java/io/strimzi/kafka/access/ResourceProvider.java b/src/test/java/io/strimzi/kafka/access/ResourceProvider.java
index 6b36c9a..bf49a69 100644
--- a/src/test/java/io/strimzi/kafka/access/ResourceProvider.java
+++ b/src/test/java/io/strimzi/kafka/access/ResourceProvider.java
@@ -27,7 +27,7 @@
import io.strimzi.api.kafka.model.status.ListenerAddressBuilder;
import io.strimzi.api.kafka.model.status.ListenerStatus;
import io.strimzi.api.kafka.model.status.ListenerStatusBuilder;
-import io.strimzi.kafka.access.internal.KafkaAccessParser;
+import io.strimzi.kafka.access.internal.KafkaAccessMapper;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaReference;
@@ -69,7 +69,7 @@ public static KafkaAccess getKafkaAccess(final String kafkaAccessName, final Str
public static Secret getEmptyKafkaAccessSecret(String secretName, String secretNamespace, String kafkaAccessName) {
final Map labels = new HashMap<>();
- labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
+ labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE);
final OwnerReference ownerReference = new OwnerReference();
ownerReference.setName(kafkaAccessName);
ownerReference.setKind(KafkaAccess.KIND);
@@ -85,15 +85,15 @@ public static Secret getEmptyKafkaAccessSecret(String secretName, String secretN
public static Secret getStrimziSecret(final String secretName, final String secretNamespace, final String kafkaInstanceName) {
final Map labels = new HashMap<>();
- labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_CLUSTER_LABEL_VALUE);
- labels.put(KafkaAccessParser.INSTANCE_LABEL_KEY, kafkaInstanceName);
+ labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE);
+ labels.put(KafkaAccessMapper.INSTANCE_LABEL_KEY, kafkaInstanceName);
return buildSecret(secretName, secretNamespace, labels);
}
public static Secret getStrimziUserSecret(final String secretName, final String secretNamespace, final String kafkaInstanceName) {
final Map labels = new HashMap<>();
- labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_USER_LABEL_VALUE);
- labels.put(KafkaAccessParser.STRIMZI_CLUSTER_LABEL_KEY, kafkaInstanceName);
+ labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE);
+ labels.put(KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_KEY, kafkaInstanceName);
return buildSecret(secretName, secretNamespace, labels);
}
diff --git a/src/test/java/io/strimzi/kafka/access/SecretDataTest.java b/src/test/java/io/strimzi/kafka/access/SecretDependentResourceTest.java
similarity index 50%
rename from src/test/java/io/strimzi/kafka/access/SecretDataTest.java
rename to src/test/java/io/strimzi/kafka/access/SecretDependentResourceTest.java
index 89a2dde..969952f 100644
--- a/src/test/java/io/strimzi/kafka/access/SecretDataTest.java
+++ b/src/test/java/io/strimzi/kafka/access/SecretDependentResourceTest.java
@@ -4,14 +4,18 @@
*/
package io.strimzi.kafka.access;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import io.strimzi.api.kafka.Crds;
+import io.fabric8.kubernetes.api.model.Secret;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserScramSha512ClientAuthentication;
import io.strimzi.api.kafka.model.listener.KafkaListenerAuthenticationScramSha512;
import io.strimzi.api.kafka.model.listener.arraylistener.KafkaListenerType;
+import io.strimzi.kafka.access.internal.CustomResourceParseException;
+import io.strimzi.kafka.access.internal.KafkaUserData;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaReference;
import io.strimzi.kafka.access.model.KafkaUserReference;
@@ -26,13 +30,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-@EnableKubernetesMockClient(crud = true)
-public class SecretDataTest {
+public class SecretDependentResourceTest {
private static final String NAME = "kafka-access-name";
private static final String NAMESPACE = "kafka-access-namespace";
@@ -40,11 +47,12 @@ public class SecretDataTest {
private static final String LISTENER_2 = "listener-2";
private static final String BOOTSTRAP_HOST = "my-kafka-name.svc";
private static final String KAFKA_NAME = "my-kafka-name";
+ private static final String KAFKA_USER_NAME = "my-kafka-user";
+ private static final String KAFKA_USER_SECRET_NAME = "my-user-secret";
private static final String KAFKA_NAMESPACE = "kafka-namespace";
private static final int BOOTSTRAP_PORT_9092 = 9092;
private static final int BOOTSTRAP_PORT_9093 = 9093;
- KubernetesClient client;
@Test
@DisplayName("When secretData is called with a KafkaAccess resource, then the data returned includes the " +
@@ -56,12 +64,11 @@ void testSecretData() {
List.of(ResourceProvider.getListener(LISTENER_1, KafkaListenerType.INTERNAL, false)),
List.of(ResourceProvider.getListenerStatus(LISTENER_1, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9092))
);
- Crds.kafkaOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafka).create();
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference);
- Map data = new KafkaAccessReconciler(client).secretData(kafkaAccess.getSpec(), NAMESPACE);
+ Map data = new SecretDependentResource().secretData(kafkaAccess.getSpec(), kafka);
final Base64.Encoder encoder = Base64.getEncoder();
final Map expectedDataEntries = new HashMap<>();
expectedDataEntries.put("type", encoder.encodeToString("kafka".getBytes(StandardCharsets.UTF_8)));
@@ -81,16 +88,14 @@ void testSecretDataWithKafkaUser() {
List.of(ResourceProvider.getListener(LISTENER_2, KafkaListenerType.INTERNAL, false, new KafkaListenerAuthenticationScramSha512())),
List.of(ResourceProvider.getListenerStatus(LISTENER_2, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9093))
);
- Crds.kafkaOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafka).create();
final KafkaUser kafkaUser = ResourceProvider.getKafkaUser(KAFKA_NAME, KAFKA_NAMESPACE, new KafkaUserScramSha512ClientAuthentication());
- Crds.kafkaUserOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafkaUser).create();
final KafkaReference kafkaReference = ResourceProvider.getKafkaReferenceWithListener(KAFKA_NAME, LISTENER_2, KAFKA_NAMESPACE);
final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_NAME, KAFKA_NAMESPACE);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference, kafkaUserReference);
- Map data = new KafkaAccessReconciler(client).secretData(kafkaAccess.getSpec(), NAMESPACE);
+ Map data = new SecretDependentResource().secretDataWithUser(kafkaAccess.getSpec(), kafka, kafkaUser, new KafkaUserData(kafkaUser));
final Base64.Encoder encoder = Base64.getEncoder();
assertThat(data).containsEntry(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
@@ -99,19 +104,24 @@ void testSecretDataWithKafkaUser() {
}
@Test
- @DisplayName("When secretData is called with a KafkaAccess resource and the referenced Kafka resource is missing, " +
+ @DisplayName("When desired is called with a KafkaAccess resource and the referenced Kafka resource is missing, " +
"then it throws an exception")
- void testSecretDataMissingKafka() {
+ void testDesiredMissingKafka() {
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference);
- assertThrows(IllegalStateException.class, () -> new KafkaAccessReconciler(client).secretData(kafkaAccess.getSpec(), NAMESPACE));
+ final Context mockContext = mock(Context.class);
+ when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.empty());
+
+ final IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext));
+ assertThat(exception).hasMessage(String.format("Kafka %s/%s missing", KAFKA_NAMESPACE, KAFKA_NAME));
}
@Test
- @DisplayName("When secretData is called with a KafkaAccess resource and the referenced KafkaUser resource is missing, " +
+ @DisplayName("When desired is called with a KafkaAccess resource and the referenced KafkaUser resource is missing, " +
"then it throws an exception")
- void testSecretDataMissingKafkaUser() {
+ void testDesiredMissingKafkaUser() {
final Kafka kafka = ResourceProvider.getKafka(
KAFKA_NAME,
KAFKA_NAMESPACE,
@@ -124,13 +134,84 @@ void testSecretDataMissingKafkaUser() {
ResourceProvider.getListenerStatus(LISTENER_2, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9093)
)
);
- Crds.kafkaOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafka).create();
+ final Context mockContext = mock(Context.class);
+ when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
+ when(mockContext.getSecondaryResource(KafkaUser.class)).thenReturn(Optional.empty());
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
- final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_NAME, KAFKA_NAMESPACE);
+ final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME, KAFKA_NAMESPACE);
+ final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference, kafkaUserReference);
+
+ final IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext));
+ assertThat(exception).hasMessage(String.format("KafkaUser %s/%s missing", KAFKA_NAMESPACE, KAFKA_USER_NAME));
+ }
+
+ @Test
+ @DisplayName("When desired is called with a KafkaAccess resource and the referenced KafkaUser resource's status is missing the secret name, " +
+ "then it throws an exception")
+ void testDesiredMissingKafkaUserSecretName() {
+ final Kafka kafka = ResourceProvider.getKafka(
+ KAFKA_NAME,
+ KAFKA_NAMESPACE,
+ List.of(
+ ResourceProvider.getListener(LISTENER_1, KafkaListenerType.INTERNAL, false),
+ ResourceProvider.getListener(LISTENER_2, KafkaListenerType.INTERNAL, false, new KafkaListenerAuthenticationScramSha512())
+ ),
+ List.of(
+ ResourceProvider.getListenerStatus(LISTENER_1, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9092),
+ ResourceProvider.getListenerStatus(LISTENER_2, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9093)
+ )
+ );
+ final Context mockContext = mock(Context.class);
+ when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
+
+ final KafkaUser kafkaUser = ResourceProvider.getKafkaUser(KAFKA_USER_NAME, KAFKA_NAMESPACE);
+ when(mockContext.getSecondaryResource(KafkaUser.class)).thenReturn(Optional.of(kafkaUser));
+
+ final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
+ final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME, KAFKA_NAMESPACE);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference, kafkaUserReference);
- assertThrows(IllegalStateException.class, () -> new KafkaAccessReconciler(client).secretData(kafkaAccess.getSpec(), NAMESPACE));
+ final IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext));
+ assertThat(exception).hasMessage(String.format("Secret in KafkaUser status %s/%s missing", KAFKA_NAMESPACE, KAFKA_USER_NAME));
+ }
+
+ @Test
+ @DisplayName("When desired is called with a KafkaAccess resource and the referenced KafkaUser resource's Secret, " +
+ "then it throws an exception")
+ void testDesiredMissingKafkaUserSecret() {
+ final Kafka kafka = ResourceProvider.getKafka(
+ KAFKA_NAME,
+ KAFKA_NAMESPACE,
+ List.of(
+ ResourceProvider.getListener(LISTENER_1, KafkaListenerType.INTERNAL, false),
+ ResourceProvider.getListener(LISTENER_2, KafkaListenerType.INTERNAL, false, new KafkaListenerAuthenticationScramSha512())
+ ),
+ List.of(
+ ResourceProvider.getListenerStatus(LISTENER_1, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9092),
+ ResourceProvider.getListenerStatus(LISTENER_2, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9093)
+ )
+ );
+ final Context mockContext = mock(Context.class);
+ final EventSourceRetriever mockEventSourceRetriever = mock(EventSourceRetriever.class);
+ final InformerEventSource mockInformerEventSource = mock(InformerEventSource.class);
+ when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
+ when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
+ when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
+ when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.empty());
+
+ final KafkaUser kafkaUser = ResourceProvider.getKafkaUserWithStatus(KAFKA_USER_NAME, KAFKA_NAMESPACE, KAFKA_USER_SECRET_NAME, "user", new KafkaUserScramSha512ClientAuthentication());
+ when(mockContext.getSecondaryResource(KafkaUser.class)).thenReturn(Optional.of(kafkaUser));
+
+ final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
+ final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME, KAFKA_NAMESPACE);
+ final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference, kafkaUserReference);
+
+ final IllegalStateException exception = assertThrows(IllegalStateException.class,
+ () -> new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext));
+ assertThat(exception).hasMessage(String.format("Secret %s for KafkaUser %s/%s missing", KAFKA_USER_SECRET_NAME, KAFKA_NAMESPACE, KAFKA_USER_NAME));
}
private static Stream userReferences() {
@@ -157,14 +238,13 @@ void testInvalidUserReference(KafkaUserReference userReference) {
ResourceProvider.getListenerStatus(LISTENER_2, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9093)
)
);
- Crds.kafkaOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafka).create();
-
- final KafkaUser kafkaUser = ResourceProvider.getKafkaUser(KAFKA_NAME, KAFKA_NAMESPACE, new KafkaUserScramSha512ClientAuthentication());
- Crds.kafkaUserOperation(client).inNamespace(KAFKA_NAMESPACE).resource(kafkaUser).create();
+ final Context mockContext = mock(Context.class);
+ when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(NAME, NAMESPACE, kafkaReference, userReference);
-
- assertThrows(IllegalStateException.class, () -> new KafkaAccessReconciler(client).secretData(kafkaAccess.getSpec(), NAMESPACE));
+ final CustomResourceParseException exception = assertThrows(CustomResourceParseException.class,
+ () -> new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext));
+ assertThat(exception).hasMessage("User kind must be KafkaUser and apiGroup must be kafka.strimzi.io");
}
}
diff --git a/src/test/java/io/strimzi/kafka/access/internal/KafkaAccessParserTest.java b/src/test/java/io/strimzi/kafka/access/internal/KafkaAccessMapperTest.java
similarity index 86%
rename from src/test/java/io/strimzi/kafka/access/internal/KafkaAccessParserTest.java
rename to src/test/java/io/strimzi/kafka/access/internal/KafkaAccessMapperTest.java
index fa80f0c..856e03e 100644
--- a/src/test/java/io/strimzi/kafka/access/internal/KafkaAccessParserTest.java
+++ b/src/test/java/io/strimzi/kafka/access/internal/KafkaAccessMapperTest.java
@@ -21,7 +21,7 @@
import static org.assertj.core.api.Assertions.assertThat;
-public class KafkaAccessParserTest {
+public class KafkaAccessMapperTest {
static final String ACCESS_NAME_1 = "my-access-1";
static final String ACCESS_NAME_2 = "my-access-2";
@@ -42,7 +42,7 @@ void testCorrectKafkaAccessReturnedForKafka() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_1, kafkaReference2);
- final Set matches = KafkaAccessParser.kafkaSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafka(KAFKA_NAME_1, NAMESPACE_2));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -56,7 +56,7 @@ void testTwoCorrectKafkaAccessReturnedForKafka() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_2, kafkaReference);
- final Set matches = KafkaAccessParser.kafkaSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafka(KAFKA_NAME_1, NAMESPACE_2));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1), new ResourceID(ACCESS_NAME_2, NAMESPACE_2));
@@ -70,7 +70,7 @@ void testKafkaAccessInMatchingNamespaceReturnedForKafka() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReferenceNullNamespace);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_2, kafkaReferenceNullNamespace);
- final Set matches = KafkaAccessParser.kafkaSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafka(KAFKA_NAME_1, NAMESPACE_1));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -85,7 +85,7 @@ void testKafkaAccessNoneMatchKafka() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_2, kafkaReference2);
- final Set matches = KafkaAccessParser.kafkaSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafka(KAFKA_NAME_1, NAMESPACE_1));
assertThat(matches).isEmpty();
@@ -102,7 +102,7 @@ void testCorrectKafkaAccessReturnedForKafkaUser() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1, kafkaUserReference1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_1, kafkaReference2, kafkaUserReference2);
- final Set matches = KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafkaUser(KAFKA_USER_NAME_1, NAMESPACE_2));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -118,7 +118,7 @@ void testTwoCorrectKafkaAccessReturnedForKafkaUser() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1, kafkaUserReference);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_2, kafkaReference2, kafkaUserReference);
- final Set matches = KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafkaUser(KAFKA_USER_NAME_1, NAMESPACE_2));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1), new ResourceID(ACCESS_NAME_2, NAMESPACE_2));
@@ -134,7 +134,7 @@ void testKafkaAccessInMatchingNamespaceReturnedForKafkaUser() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1, kafkaUserReferenceNullNamespace);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_2, kafkaReference2, kafkaUserReferenceNullNamespace);
- final Set matches = KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafkaUser(KAFKA_USER_NAME_1, NAMESPACE_1));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -150,7 +150,7 @@ void testKafkaAccessNoneMatchKafkaUser() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1, kafkaUserReference1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_2, kafkaReference2);
- final Set matches = KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getKafkaUser(KAFKA_USER_NAME_1, NAMESPACE_1));
assertThat(matches).isEmpty();
@@ -163,7 +163,7 @@ void testCorrectKafkaAccessReturnedForKafkaAccessSecret() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_1);
- final Set matches = KafkaAccessParser.secretSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.secretSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getEmptyKafkaAccessSecret(SECRET_NAME, NAMESPACE_1, ACCESS_NAME_1));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -173,7 +173,7 @@ void testCorrectKafkaAccessReturnedForKafkaAccessSecret() {
@DisplayName("When secretSecondaryToPrimaryMapper() is called with an empty cache and a secret that is managed " +
"by a KafkaAccess, then the correct KafkaAccess is returned")
void testCorrectKafkaAccessReturnedForKafkaAccessSecretEmptyCache() {
- final Set matches = KafkaAccessParser.secretSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.secretSecondaryToPrimaryMapper(
Stream.of(),
ResourceProvider.getEmptyKafkaAccessSecret(SECRET_NAME, NAMESPACE_1, ACCESS_NAME_1));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -188,7 +188,7 @@ void testCorrectKafkaAccessReturnedForStrimziSecret() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_1, kafkaReference2);
- final Set matches = KafkaAccessParser.secretSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.secretSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getStrimziSecret(SECRET_NAME, NAMESPACE_1, KAFKA_NAME_1));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -203,7 +203,7 @@ void testCorrectKafkaAccessReturnedForStrimziUserSecret() {
final KafkaAccess kafkaAccess1 = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference1);
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_1, kafkaReference2);
- final Set matches = KafkaAccessParser.secretSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.secretSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
ResourceProvider.getStrimziUserSecret(SECRET_NAME, NAMESPACE_1, KAFKA_NAME_1));
assertThat(matches).containsExactly(new ResourceID(ACCESS_NAME_1, NAMESPACE_1));
@@ -219,7 +219,7 @@ void testEmptySetForSecretManagedByUnknown() {
final KafkaAccess kafkaAccess2 = ResourceProvider.getKafkaAccess(ACCESS_NAME_2, NAMESPACE_1, kafkaReference2);
final Map labels = new HashMap<>();
- labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, "unknown");
+ labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, "unknown");
final Secret secret = new SecretBuilder()
.withNewMetadata()
.withName(SECRET_NAME)
@@ -228,7 +228,7 @@ void testEmptySetForSecretManagedByUnknown() {
.endMetadata()
.build();
- final Set matches = KafkaAccessParser.secretSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.secretSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
secret);
assertThat(matches).isEmpty();
@@ -250,7 +250,7 @@ void testEmptySetForUnmanagedSecret() {
.endMetadata()
.build();
- final Set matches = KafkaAccessParser.secretSecondaryToPrimaryMapper(
+ final Set matches = KafkaAccessMapper.secretSecondaryToPrimaryMapper(
Stream.of(kafkaAccess1, kafkaAccess2),
secret);
assertThat(matches).isEmpty();
@@ -263,7 +263,7 @@ void testKafkaAccessWithMissingKafkaUser() {
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME_1, NAMESPACE_1);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference);
- final Set matches = KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper(kafkaAccess);
+ final Set matches = KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper(kafkaAccess);
assertThat(matches).isEmpty();
}
@@ -275,7 +275,7 @@ void testKafkaAccessWithKafkaUser() {
final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME_1, NAMESPACE_2);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference, kafkaUserReference);
- final Set matches = KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper(kafkaAccess);
+ final Set matches = KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper(kafkaAccess);
assertThat(matches).hasSize(1);
assertThat(matches).containsExactly(new ResourceID(KAFKA_USER_NAME_1, NAMESPACE_2));
}
@@ -288,8 +288,32 @@ void testKafkaAccessWithKafkaUserMissingNamespace() {
final KafkaUserReference kafkaUserReference = ResourceProvider.getKafkaUserReference(KAFKA_USER_NAME_1, null);
final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_1, kafkaReference, kafkaUserReference);
- final Set matches = KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper(kafkaAccess);
+ final Set matches = KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper(kafkaAccess);
assertThat(matches).hasSize(1);
assertThat(matches).containsExactly(new ResourceID(KAFKA_USER_NAME_1, NAMESPACE_1));
}
+
+ @Test
+ @DisplayName("When kafkaPrimaryToSecondaryMapper() is called with a KafkaAccess, " +
+ "then the returned set includes the Kafka that is referenced")
+ void testKafkaAccessWithKafka() {
+ final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME_1, NAMESPACE_1);
+ final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_2, kafkaReference);
+
+ final Set matches = KafkaAccessMapper.kafkaPrimaryToSecondaryMapper(kafkaAccess);
+ assertThat(matches).hasSize(1);
+ assertThat(matches).containsExactly(new ResourceID(KAFKA_NAME_1, NAMESPACE_1));
+ }
+
+ @Test
+ @DisplayName("When kafkaPrimaryToSecondaryMapper() is called with a KafkaAccess that references a Kafka but no namespace, " +
+ "then the returned set includes the Kafka with the namespace of the KafkaAccess")
+ void testKafkaAccessWithKafkaMissingNamespace() {
+ final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME_1, null);
+ final KafkaAccess kafkaAccess = ResourceProvider.getKafkaAccess(ACCESS_NAME_1, NAMESPACE_2, kafkaReference);
+
+ final Set matches = KafkaAccessMapper.kafkaPrimaryToSecondaryMapper(kafkaAccess);
+ assertThat(matches).hasSize(1);
+ assertThat(matches).containsExactly(new ResourceID(KAFKA_NAME_1, NAMESPACE_2));
+ }
}