Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update operator to use cached resources #30

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
<!-- Test dependencies -->
<junit-jupiter.version>5.8.2</junit-jupiter.version>
<assertj.version>3.21.0</assertj.version>
<mockito.version>4.11.0</mockito.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -218,6 +219,12 @@
<version>${fabric8.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
135 changes: 36 additions & 99 deletions src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,18 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.KafkaUserAuthentication;
import io.strimzi.api.kafka.model.KafkaUserSpec;
import io.strimzi.api.kafka.model.status.KafkaUserStatus;
import io.strimzi.kafka.access.internal.KafkaListener;
import io.strimzi.kafka.access.internal.KafkaAccessParser;
import io.strimzi.kafka.access.internal.KafkaUserData;
import io.strimzi.kafka.access.internal.KafkaAccessMapper;
import io.strimzi.kafka.access.model.BindingStatus;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaAccessStatus;
import io.strimzi.kafka.access.model.KafkaReference;
import io.strimzi.kafka.access.model.KafkaUserReference;
import io.strimzi.kafka.access.internal.KafkaParser;
import io.strimzi.kafka.access.internal.CustomResourceParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -50,28 +38,25 @@
@ControllerConfiguration
public class KafkaAccessReconciler implements Reconciler<KafkaAccess>, EventSourceInitializer<KafkaAccess> {

private static final String TYPE_SECRET_KEY = "type";
private static final String TYPE_SECRET_VALUE = "kafka";
private static final String PROVIDER_SECRET_KEY = "provider";
private static final String PROVIDER_SECRET_VALUE = "strimzi";
private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final String ACCESS_SECRET_EVENT_SOURCE = "access-secret-event-source";

private final KubernetesClient kubernetesClient;
private final Map<String, String> commonSecretData = new HashMap<>();
private InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource;
private final SecretDependentResource secretDependentResource;
private final Map<String, String> commonSecretLabels = new HashMap<>();

private static final String SECRET_TYPE = "servicebinding.io/kafka";
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessOperator.class);

/**
* Name of the event source for KafkaUser Secret resources
*/
public static final String KAFKA_USER_SECRET_EVENT_SOURCE = "KAFKA_USER_SECRET_EVENT_SOURCE";

/**
* @param kubernetesClient The Kubernetes client
*/
public KafkaAccessReconciler(final KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
final Base64.Encoder encoder = Base64.getEncoder();
commonSecretData.put(TYPE_SECRET_KEY, encoder.encodeToString(TYPE_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
commonSecretData.put(PROVIDER_SECRET_KEY, encoder.encodeToString(PROVIDER_SECRET_VALUE.getBytes(StandardCharsets.UTF_8)));
commonSecretLabels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
secretDependentResource = new SecretDependentResource();
commonSecretLabels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE);
}

/**
Expand All @@ -88,8 +73,7 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
LOGGER.info("Reconciling KafkaAccess {}/{}", kafkaAccessNamespace, kafkaAccessName);

final Map<String, String> data = secretData(kafkaAccess.getSpec(), kafkaAccessNamespace);
createOrUpdateSecret(context, data, kafkaAccess);
createOrUpdateSecret(secretDependentResource.desired(kafkaAccess.getSpec(), kafkaAccessNamespace, context), kafkaAccess);

final boolean bindingStatusCorrect = Optional.ofNullable(kafkaAccess.getStatus())
.map(KafkaAccessStatus::getBinding)
Expand All @@ -106,52 +90,13 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
}
}

protected Map<String, String> secretData(final KafkaAccessSpec spec, final String kafkaAccessNamespace) {
final KafkaReference kafkaReference = spec.getKafka();
final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(kafkaAccessNamespace);
final Optional<KafkaUserReference> kafkaUserReference = Optional.ofNullable(spec.getUser());

final Kafka kafka = getKafka(kafkaReference.getName(), kafkaClusterNamespace);

final Map<String, String> data = new HashMap<>(commonSecretData);
final KafkaListener listener;
try {
if (kafkaUserReference.isPresent()) {
if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
}
final String kafkaUserName = kafkaUserReference.get().getName();
final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(kafkaAccessNamespace);
final KafkaUser kafkaUser = getKafkaUser(kafkaUserName, kafkaUserNamespace);
final String kafkaUserType = Optional.ofNullable(kafkaUser)
.map(KafkaUser::getSpec)
.map(KafkaUserSpec::getAuthentication)
.map(KafkaUserAuthentication::getType)
.orElse(KafkaParser.USER_AUTH_UNDEFINED);
listener = KafkaParser.getKafkaListener(kafka, spec, kafkaUserType);
if (kafkaUser != null) {
final KafkaUserData userData = Optional.ofNullable(kafkaUser.getStatus())
.map(KafkaUserStatus::getSecret)
.map(secretName -> kubernetesClient.secrets().inNamespace(kafkaUserNamespace).withName(secretName).get())
.map(secret -> new KafkaUserData(kafkaUser).withSecret(secret))
.orElse(new KafkaUserData(kafkaUser));
data.putAll(userData.getConnectionSecretData());
}

} else {
listener = KafkaParser.getKafkaListener(kafka, spec);
}
} catch (CustomResourceParseException e) {
throw new IllegalStateException("Reconcile failed due to ParserException " + e.getMessage());
}
data.putAll(listener.getConnectionSecretData());
return data;
}

private void createOrUpdateSecret(final Context<KafkaAccess> context, final Map<String, String> data, final KafkaAccess kafkaAccess) {
private void createOrUpdateSecret(final Map<String, String> data, final KafkaAccess kafkaAccess) {
final String kafkaAccessName = kafkaAccess.getMetadata().getName();
final String kafkaAccessNamespace = kafkaAccess.getMetadata().getNamespace();
context.getSecondaryResource(Secret.class, ACCESS_SECRET_EVENT_SOURCE)
if (kafkaAccessSecretEventSource == null) {
throw new IllegalStateException("Event source for Kafka Access Secret not initialized, cannot reconcile");
}
kafkaAccessSecretEventSource.get(new ResourceID(kafkaAccessName, kafkaAccessNamespace))
.ifPresentOrElse(secret -> {
final Map<String, String> currentData = secret.getData();
if (!data.equals(currentData)) {
Expand Down Expand Up @@ -199,50 +144,42 @@ public Map<String, EventSource> prepareEventSources(final EventSourceContext<Kaf
LOGGER.info("Preparing event sources");
InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = new InformerEventSource<>(
InformerConfiguration.from(Kafka.class, context)
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessParser.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<KafkaUser, KafkaAccess> kafkaUserEventSource = new InformerEventSource<>(
InformerConfiguration.from(KafkaUser.class, context)
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziKafkaUserSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource = new InformerEventSource<>(
kafkaAccessSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(kafkaEventSource, kafkaUserEventSource, strimziSecretEventSource, strimziKafkaUserSecretEventSource);
eventSources.put(ACCESS_SECRET_EVENT_SOURCE, kafkaAccessSecretEventSource);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(
kafkaEventSource,
kafkaUserEventSource,
strimziSecretEventSource,
kafkaAccessSecretEventSource
);
eventSources.put(KAFKA_USER_SECRET_EVENT_SOURCE, strimziKafkaUserSecretEventSource);
LOGGER.info("Finished preparing event sources");
return eventSources;
}

private Kafka getKafka(final String clusterName, final String namespace) {
return Crds.kafkaOperation(kubernetesClient)
.inNamespace(namespace)
.withName(clusterName)
.get();
}

private KafkaUser getKafkaUser(final String name, final String namespace) {
return Crds.kafkaUserOperation(kubernetesClient)
.inNamespace(namespace)
.withName(name)
.get();
}
}
Loading