Skip to content

Commit

Permalink
feat: Update operator to use cached resources (#30)
Browse files Browse the repository at this point in the history
Signed-off-by: Katherine Stanley <[email protected]>
  • Loading branch information
katheris authored Aug 24, 2023
1 parent d74ff2a commit e02540b
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 161 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
<!-- Test dependencies -->
<junit-jupiter.version>5.8.2</junit-jupiter.version>
<assertj.version>3.21.0</assertj.version>
<mockito.version>4.11.0</mockito.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -218,6 +219,12 @@
<version>${fabric8.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
135 changes: 36 additions & 99 deletions src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,18 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserAuthentication;
import io.strimzi.api.kafka.model.KafkaUserSpec;
import io.strimzi.api.kafka.model.status.KafkaUserStatus;
import io.strimzi.kafka.access.internal.KafkaListener;
import io.strimzi.kafka.access.internal.KafkaAccessParser;
import io.strimzi.kafka.access.internal.KafkaUserData;
import io.strimzi.kafka.access.internal.KafkaAccessMapper;
import io.strimzi.kafka.access.model.BindingStatus;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaAccessStatus;
import io.strimzi.kafka.access.model.KafkaReference;
import io.strimzi.kafka.access.model.KafkaUserReference;
import io.strimzi.kafka.access.internal.KafkaParser;
import io.strimzi.kafka.access.internal.CustomResourceParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -50,28 +38,25 @@
@ControllerConfiguration
public class KafkaAccessReconciler implements Reconciler<KafkaAccess>, EventSourceInitializer<KafkaAccess> {

private static final String TYPE_SECRET_KEY = "type";
private static final String TYPE_SECRET_VALUE = "kafka";
private static final String PROVIDER_SECRET_KEY = "provider";
private static final String PROVIDER_SECRET_VALUE = "strimzi";
private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final String ACCESS_SECRET_EVENT_SOURCE = "access-secret-event-source";

private final KubernetesClient kubernetesClient;
private final Map<String, String> commonSecretData = new HashMap<>();
private InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource;
private final SecretDependentResource secretDependentResource;
private final Map<String, String> commonSecretLabels = new HashMap<>();

private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessOperator.class);

/**
* Name of the event source for KafkaUser Secret resources
*/
public static final String KAFKA_USER_SECRET_EVENT_SOURCE = "KAFKA_USER_SECRET_EVENT_SOURCE";

/**
* @param kubernetesClient The Kubernetes client
*/
public KafkaAccessReconciler(final KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
final Base64.Encoder encoder = Base64.getEncoder();
commonSecretData.put(TYPE_SECRET_KEY, encoder.encodeToString(TYPE_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
commonSecretData.put(PROVIDER_SECRET_KEY, encoder.encodeToString(PROVIDER_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
commonSecretLabels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
secretDependentResource = new SecretDependentResource();
commonSecretLabels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE);
}

/**
Expand All @@ -88,8 +73,7 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
LOGGER.info("Reconciling KafkaAccess {}/{}", kafkaAccessNamespace, kafkaAccessName);

final Map<String, String> data = secretData(kafkaAccess.getSpec(), kafkaAccessNamespace);
createOrUpdateSecret(context, data, kafkaAccess);
createOrUpdateSecret(secretDependentResource.desired(kafkaAccess.getSpec(), kafkaAccessNamespace, context), kafkaAccess);

final boolean bindingStatusCorrect = Optional.ofNullable(kafkaAccess.getStatus())
.map(KafkaAccessStatus::getBinding)
Expand All @@ -106,52 +90,13 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
}
}

protected Map<String, String> secretData(final KafkaAccessSpec spec, final String kafkaAccessNamespace) {
final KafkaReference kafkaReference = spec.getKafka();
final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(kafkaAccessNamespace);
final Optional<KafkaUserReference> kafkaUserReference = Optional.ofNullable(spec.getUser());

final Kafka kafka = getKafka(kafkaReference.getName(), kafkaClusterNamespace);

final Map<String, String> data = new HashMap<>(commonSecretData);
final KafkaListener listener;
try {
if (kafkaUserReference.isPresent()) {
if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
}
final String kafkaUserName = kafkaUserReference.get().getName();
final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(kafkaAccessNamespace);
final KafkaUser kafkaUser = getKafkaUser(kafkaUserName, kafkaUserNamespace);
final String kafkaUserType = Optional.ofNullable(kafkaUser)
.map(KafkaUser::getSpec)
.map(KafkaUserSpec::getAuthentication)
.map(KafkaUserAuthentication::getType)
.orElse(KafkaParser.USER_AUTH_UNDEFINED);
listener = KafkaParser.getKafkaListener(kafka, spec, kafkaUserType);
if (kafkaUser != null) {
final KafkaUserData userData = Optional.ofNullable(kafkaUser.getStatus())
.map(KafkaUserStatus::getSecret)
.map(secretName -> kubernetesClient.secrets().inNamespace(kafkaUserNamespace).withName(secretName).get())
.map(secret -> new KafkaUserData(kafkaUser).withSecret(secret))
.orElse(new KafkaUserData(kafkaUser));
data.putAll(userData.getConnectionSecretData());
}

} else {
listener = KafkaParser.getKafkaListener(kafka, spec);
}
} catch (CustomResourceParseException e) {
throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
}
data.putAll(listener.getConnectionSecretData());
return data;
}

private void createOrUpdateSecret(final Context<KafkaAccess> context, final Map<String, String> data, final KafkaAccess kafkaAccess) {
private void createOrUpdateSecret(final Map<String, String> data, final KafkaAccess kafkaAccess) {
final String kafkaAccessName = kafkaAccess.getMetadata().getName();
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
context.getSecondaryResource(Secret.class, ACCESS_SECRET_EVENT_SOURCE)
if (kafkaAccessSecretEventSource == null) {
throw new IllegalStateException("Event source for Kafka Access Secret not initialized, cannot reconcile");
}
kafkaAccessSecretEventSource.get(new ResourceID(kafkaAccessName, kafkaAccessNamespace))
.ifPresentOrElse(secret -> {
final Map<String, String> currentData = secret.getData();
if (!data.equals(currentData)) {
Expand Down Expand Up @@ -199,50 +144,42 @@ public Map<String, EventSource> prepareEventSources(final EventSourceContext<Kaf
LOGGER.info("Preparing event sources");
InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = new InformerEventSource<>(
InformerConfiguration.from(Kafka.class, context)
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessParser.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<KafkaUser, KafkaAccess> kafkaUserEventSource = new InformerEventSource<>(
InformerConfiguration.from(KafkaUser.class, context)
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziKafkaUserSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource = new InformerEventSource<>(
kafkaAccessSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(kafkaEventSource, kafkaUserEventSource, strimziSecretEventSource, strimziKafkaUserSecretEventSource);
eventSources.put(ACCESS_SECRET_EVENT_SOURCE, kafkaAccessSecretEventSource);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(
kafkaEventSource,
kafkaUserEventSource,
strimziSecretEventSource,
kafkaAccessSecretEventSource
);
eventSources.put(KAFKA_USER_SECRET_EVENT_SOURCE, strimziKafkaUserSecretEventSource);
LOGGER.info("Finished preparing event sources");
return eventSources;
}

private Kafka getKafka(final String clusterName, final String namespace) {
return Crds.kafkaOperation(kubernetesClient)
.inNamespace(namespace)
.withName(clusterName)
.get();
}

private KafkaUser getKafkaUser(final String name, final String namespace) {
return Crds.kafkaUserOperation(kubernetesClient)
.inNamespace(namespace)
.withName(name)
.get();
}
}
Loading

0 comments on commit e02540b

Please sign in to comment.