Skip to content

Commit

Permalink
feat: Update operator to use cached resources
Browse files Browse the repository at this point in the history
* Update operator to use cached resources rather
than calling Kubernetes directly.
* Move code to construct secret data into a
separate class to make it easier to switch to
DependentResource type later.

Signed-off-by: Katherine Stanley <[email protected]>
  • Loading branch information
katheris committed Aug 23, 2023
1 parent d74ff2a commit c45cb8b
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 113 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
<!-- Test dependencies -->
<junit-jupiter.version>5.8.2</junit-jupiter.version>
<assertj.version>3.21.0</assertj.version>
<mockito.version>4.11.0</mockito.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -218,6 +219,12 @@
<version>${fabric8.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
112 changes: 24 additions & 88 deletions src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,18 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserAuthentication;
import io.strimzi.api.kafka.model.KafkaUserSpec;
import io.strimzi.api.kafka.model.status.KafkaUserStatus;
import io.strimzi.kafka.access.internal.KafkaListener;
import io.strimzi.kafka.access.internal.KafkaAccessParser;
import io.strimzi.kafka.access.internal.KafkaUserData;
import io.strimzi.kafka.access.model.BindingStatus;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaAccessStatus;
import io.strimzi.kafka.access.model.KafkaReference;
import io.strimzi.kafka.access.model.KafkaUserReference;
import io.strimzi.kafka.access.internal.KafkaParser;
import io.strimzi.kafka.access.internal.CustomResourceParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -50,27 +38,24 @@
@ControllerConfiguration
public class KafkaAccessReconciler implements Reconciler<KafkaAccess>, EventSourceInitializer<KafkaAccess> {

private static final String TYPE_SECRET_KEY = "type";
private static final String TYPE_SECRET_VALUE = "kafka";
private static final String PROVIDER_SECRET_KEY = "provider";
private static final String PROVIDER_SECRET_VALUE = "strimzi";
private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final String ACCESS_SECRET_EVENT_SOURCE = "access-secret-event-source";

private final KubernetesClient kubernetesClient;
private final Map<String, String> commonSecretData = new HashMap<>();
private InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource;
private final SecretDependentResource secretDependentResource;
private final Map<String, String> commonSecretLabels = new HashMap<>();

private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessOperator.class);

/**
* Name of the event source for KafkaUser Secret resources
*/
public static final String KAFKA_USER_SECRET_EVENT_SOURCE = "KAFKA_USER_SECRET_EVENT_SOURCE";

/**
* @param kubernetesClient The Kubernetes client
*/
public KafkaAccessReconciler(final KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
final Base64.Encoder encoder = Base64.getEncoder();
commonSecretData.put(TYPE_SECRET_KEY, encoder.encodeToString(TYPE_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
commonSecretData.put(PROVIDER_SECRET_KEY, encoder.encodeToString(PROVIDER_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
secretDependentResource = new SecretDependentResource();
commonSecretLabels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
}

Expand All @@ -88,8 +73,7 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
LOGGER.info("Reconciling KafkaAccess {}/{}", kafkaAccessNamespace, kafkaAccessName);

final Map<String, String> data = secretData(kafkaAccess.getSpec(), kafkaAccessNamespace);
createOrUpdateSecret(context, data, kafkaAccess);
createOrUpdateSecret(secretDependentResource.desired(kafkaAccess.getSpec(), kafkaAccessNamespace, context), kafkaAccess);

final boolean bindingStatusCorrect = Optional.ofNullable(kafkaAccess.getStatus())
.map(KafkaAccessStatus::getBinding)
Expand All @@ -106,52 +90,13 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
}
}

protected Map<String, String> secretData(final KafkaAccessSpec spec, final String kafkaAccessNamespace) {
final KafkaReference kafkaReference = spec.getKafka();
final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(kafkaAccessNamespace);
final Optional<KafkaUserReference> kafkaUserReference = Optional.ofNullable(spec.getUser());

final Kafka kafka = getKafka(kafkaReference.getName(), kafkaClusterNamespace);

final Map<String, String> data = new HashMap<>(commonSecretData);
final KafkaListener listener;
try {
if (kafkaUserReference.isPresent()) {
if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
}
final String kafkaUserName = kafkaUserReference.get().getName();
final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(kafkaAccessNamespace);
final KafkaUser kafkaUser = getKafkaUser(kafkaUserName, kafkaUserNamespace);
final String kafkaUserType = Optional.ofNullable(kafkaUser)
.map(KafkaUser::getSpec)
.map(KafkaUserSpec::getAuthentication)
.map(KafkaUserAuthentication::getType)
.orElse(KafkaParser.USER_AUTH_UNDEFINED);
listener = KafkaParser.getKafkaListener(kafka, spec, kafkaUserType);
if (kafkaUser != null) {
final KafkaUserData userData = Optional.ofNullable(kafkaUser.getStatus())
.map(KafkaUserStatus::getSecret)
.map(secretName -> kubernetesClient.secrets().inNamespace(kafkaUserNamespace).withName(secretName).get())
.map(secret -> new KafkaUserData(kafkaUser).withSecret(secret))
.orElse(new KafkaUserData(kafkaUser));
data.putAll(userData.getConnectionSecretData());
}

} else {
listener = KafkaParser.getKafkaListener(kafka, spec);
}
} catch (CustomResourceParseException e) {
throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
}
data.putAll(listener.getConnectionSecretData());
return data;
}

private void createOrUpdateSecret(final Context<KafkaAccess> context, final Map<String, String> data, final KafkaAccess kafkaAccess) {
private void createOrUpdateSecret(final Map<String, String> data, final KafkaAccess kafkaAccess) {
final String kafkaAccessName = kafkaAccess.getMetadata().getName();
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
context.getSecondaryResource(Secret.class, ACCESS_SECRET_EVENT_SOURCE)
if (kafkaAccessSecretEventSource == null) {
throw new IllegalStateException("Event source for Kafka Access Secret not initialized, cannot reconcile");
}
kafkaAccessSecretEventSource.get(new ResourceID(kafkaAccessName, kafkaAccessNamespace))
.ifPresentOrElse(secret -> {
final Map<String, String> currentData = secret.getData();
if (!data.equals(currentData)) {
Expand Down Expand Up @@ -220,29 +165,20 @@ public Map<String, EventSource> prepareEventSources(final EventSourceContext<Kaf
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource = new InformerEventSource<>(
kafkaAccessSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(kafkaEventSource, kafkaUserEventSource, strimziSecretEventSource, strimziKafkaUserSecretEventSource);
eventSources.put(ACCESS_SECRET_EVENT_SOURCE, kafkaAccessSecretEventSource);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(
kafkaEventSource,
kafkaUserEventSource,
strimziSecretEventSource,
kafkaAccessSecretEventSource
);
eventSources.put(KAFKA_USER_SECRET_EVENT_SOURCE, strimziKafkaUserSecretEventSource);
LOGGER.info("Finished preparing event sources");
return eventSources;
}

private Kafka getKafka(final String clusterName, final String namespace) {
return Crds.kafkaOperation(kubernetesClient)
.inNamespace(namespace)
.withName(clusterName)
.get();
}

private KafkaUser getKafkaUser(final String name, final String namespace) {
return Crds.kafkaUserOperation(kubernetesClient)
.inNamespace(namespace)
.withName(name)
.get();
}
}
119 changes: 119 additions & 0 deletions src/main/java/io/strimzi/kafka/access/SecretDependentResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.access;

import io.fabric8.kubernetes.api.model.Secret;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserAuthentication;
import io.strimzi.api.kafka.model.KafkaUserSpec;
import io.strimzi.api.kafka.model.status.KafkaUserStatus;
import io.strimzi.kafka.access.internal.CustomResourceParseException;
import io.strimzi.kafka.access.internal.KafkaListener;
import io.strimzi.kafka.access.internal.KafkaParser;
import io.strimzi.kafka.access.internal.KafkaUserData;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaReference;
import io.strimzi.kafka.access.model.KafkaUserReference;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Class to represent the data in the Secret created by the operator
* In future updates this could be updated to implement the
* Java Operator SDK DependentResource class
*/
public class SecretDependentResource {

private static final String TYPE_SECRET_KEY = "type";
private static final String TYPE_SECRET_VALUE = "kafka";
private static final String PROVIDER_SECRET_KEY = "provider";
private static final String PROVIDER_SECRET_VALUE = "strimzi";
private final Map<String, String> commonSecretData = new HashMap<>();

/**
* Default constructor that initialises the common secret data
*/
public SecretDependentResource() {
final Base64.Encoder encoder = Base64.getEncoder();
commonSecretData.put(TYPE_SECRET_KEY, encoder.encodeToString(TYPE_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
commonSecretData.put(PROVIDER_SECRET_KEY, encoder.encodeToString(PROVIDER_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
}

/**
* The desired state of the data in the secret
* @param spec The spec of the KafkaAccess resource being reconciled
* @param namespace The namespace of the KafkaAccess resource being reconciled
* @param context The event source context
* @return The data for the Secret as a Map
*/
public Map<String, String> desired(final KafkaAccessSpec spec, final String namespace, final Context<KafkaAccess> context) {
final KafkaReference kafkaReference = spec.getKafka();
final String kafkaClusterName = kafkaReference.getName();
final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(namespace);
final InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = (InformerEventSource<Kafka, KafkaAccess>) context.eventSourceRetriever().getResourceEventSourceFor(Kafka.class);
final Kafka kafka = kafkaEventSource.get(new ResourceID(kafkaClusterName, kafkaClusterNamespace))
.orElseThrow(() -> new IllegalStateException(String.format("Kafka %s/%s missing", kafkaClusterNamespace, kafkaClusterName)));
final Optional<KafkaUserReference> kafkaUserReference = Optional.ofNullable(spec.getUser());
final Map<String, String> data = new HashMap<>(commonSecretData);
if (kafkaUserReference.isPresent()) {
if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
}
final String kafkaUserName = kafkaUserReference.get().getName();
final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(namespace);
final KafkaUser kafkaUser = context.getSecondaryResource(KafkaUser.class)
.orElseThrow(() -> new IllegalStateException(String.format("KafkaUser %s/%s missing", kafkaUserNamespace, kafkaUserName)));
final String userSecretName = Optional.ofNullable(kafkaUser.getStatus())
.map(KafkaUserStatus::getSecret)
.orElseThrow(() -> new IllegalStateException(String.format("Secret for KafkaUser %s/%s missing", kafkaUserNamespace, kafkaUserName)));
final InformerEventSource<Secret, KafkaAccess> kafkaUserSecretEventSource = (InformerEventSource<Secret, KafkaAccess>) context.eventSourceRetriever()
.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
final Secret kafkaUserSecret = kafkaUserSecretEventSource.get(new ResourceID(userSecretName, kafkaUserNamespace))
.orElseThrow(() -> new IllegalStateException(String.format("Secret for KafkaUser %s/%s missing", kafkaUserNamespace, kafkaUserName)));
data.putAll(secretDataWithUser(spec, kafka, kafkaUser, new KafkaUserData(kafkaUser).withSecret(kafkaUserSecret)));
} else {
data.putAll(secretData(spec, kafka));
}
return data;
}

protected Map<String, String> secretData(final KafkaAccessSpec spec, final Kafka kafka) {
final Map<String, String> data = new HashMap<>(commonSecretData);
final KafkaListener listener;
try {
listener = KafkaParser.getKafkaListener(kafka, spec);
} catch (CustomResourceParseException e) {
throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
}
data.putAll(listener.getConnectionSecretData());
return data;
}

protected Map<String, String> secretDataWithUser(final KafkaAccessSpec spec, final Kafka kafka, final KafkaUser kafkaUser, final KafkaUserData kafkaUserData) {
final Map<String, String> data = new HashMap<>(commonSecretData);
final KafkaListener listener;
try {
final String kafkaUserType = Optional.ofNullable(kafkaUser.getSpec())
.map(KafkaUserSpec::getAuthentication)
.map(KafkaUserAuthentication::getType)
.orElse(KafkaParser.USER_AUTH_UNDEFINED);
listener = KafkaParser.getKafkaListener(kafka, spec, kafkaUserType);
data.putAll(kafkaUserData.getConnectionSecretData());
} catch (CustomResourceParseException e) {
throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
}
data.putAll(listener.getConnectionSecretData());
return data;
}
}
Loading

0 comments on commit c45cb8b

Please sign in to comment.