diff --git a/operator/pom.xml b/operator/pom.xml
index 049357c3f..518dde1a3 100644
--- a/operator/pom.xml
+++ b/operator/pom.xml
@@ -13,6 +13,7 @@
true
+ 512m
diff --git a/operator/src/main/java/org/bf2/operator/managers/OperandOverrideManager.java b/operator/src/main/java/org/bf2/operator/managers/OperandOverrideManager.java
new file mode 100644
index 000000000..e0ce6de05
--- /dev/null
+++ b/operator/src/main/java/org/bf2/operator/managers/OperandOverrideManager.java
@@ -0,0 +1,149 @@
+package org.bf2.operator.managers;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.utils.Serialization;
+import io.quarkus.runtime.Startup;
+import org.bf2.common.ResourceInformerFactory;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
+
+import javax.annotation.PostConstruct;
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Startup
+@ApplicationScoped
+public class OperandOverrideManager {
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class OperandOverride {
+ public String image;
+
+ public String getImage() {
+ return image;
+ }
+
+ public void setImage(String image) {
+ this.image = image;
+ }
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class Canary extends OperandOverride {
+ public OperandOverride init = new OperandOverride();
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class OperandOverrides {
+ public Canary canary = new Canary();
+ @JsonProperty(value = "admin-server")
+ public OperandOverride adminServer = new OperandOverride();
+ }
+
+ static final OperandOverrides EMPTY = new OperandOverrides();
+
+ public static final String OPERANDS_YAML = "fleetshard_operands.yaml";
+
+ private Map overrides = new ConcurrentHashMap<>();
+
+ @ConfigProperty(name = "image.admin-api")
+ String adminApiImage;
+
+ @ConfigProperty(name = "image.canary")
+ String canaryImage;
+
+ @ConfigProperty(name = "image.canary-init")
+ String canaryInitImage;
+
+ @Inject
+ KubernetesClient kubernetesClient;
+
+ @Inject
+ ResourceInformerFactory resourceInformerFactory;
+
+ @Inject
+ InformerManager informerManager;
+
+ @Inject
+ Logger log;
+
+ @PostConstruct
+ protected void onStart() {
+ this.resourceInformerFactory.create(ConfigMap.class,
+ this.kubernetesClient.configMaps().inAnyNamespace().withLabel("app", "strimzi"),
+ new ResourceEventHandler() {
+ @Override
+ public void onAdd(ConfigMap obj) {
+ updateOverrides(obj);
+ }
+
+ @Override
+ public void onDelete(ConfigMap obj, boolean deletedFinalStateUnknown) {
+ removeOverrides(obj);
+ }
+
+ @Override
+ public void onUpdate(ConfigMap oldObj, ConfigMap newObj) {
+ updateOverrides(newObj);
+ }
+ });
+ }
+
+ private OperandOverrides getOverrides(String strimzi) {
+ return overrides.getOrDefault(strimzi == null ? "" : strimzi, EMPTY);
+ }
+
+ public String getCanaryImage(String strimzi) {
+ return Optional.ofNullable(getOverrides(strimzi).canary.image).orElse(canaryImage);
+ }
+
+ public String getCanaryInitImage(String strimzi) {
+ return Optional.ofNullable(getOverrides(strimzi).canary.init.image).orElse(canaryInitImage);
+ }
+
+ public String getAdminServerImage(String strimzi) {
+ return Optional.ofNullable(getOverrides(strimzi).adminServer.image).orElse(adminApiImage);
+ }
+
+ void updateOverrides(ConfigMap obj) {
+ String name = obj.getMetadata().getName();
+ if (name.startsWith(StrimziManager.STRIMZI_CLUSTER_OPERATOR)) {
+ String data = obj.getData().get(OPERANDS_YAML);
+ log.infof("Updating overrides for {} to {}", name, data);
+ boolean resync = false;
+ if (data == null) {
+ overrides.remove(name);
+ resync = true;
+ } else {
+ OperandOverrides operands = Serialization.unmarshal(data, OperandOverrides.class);
+ OperandOverrides old = overrides.put(name, operands);
+ resync = old == null || !Serialization.asYaml(old).equals(Serialization.asYaml(operands));
+ }
+ if (resync) {
+ informerManager.resyncManagedKafka();
+ }
+ }
+ }
+
+ void removeOverrides(ConfigMap obj) {
+ String name = obj.getMetadata().getName();
+ if (name.startsWith(StrimziManager.STRIMZI_CLUSTER_OPERATOR)) {
+ log.infof("removing overrides for {}", name);
+ overrides.remove(name);
+ informerManager.resyncManagedKafka();
+ }
+ }
+
+ void resetOverrides() {
+ this.overrides.clear();
+ }
+
+}
diff --git a/operator/src/main/java/org/bf2/operator/managers/StrimziManager.java b/operator/src/main/java/org/bf2/operator/managers/StrimziManager.java
index 096c1788a..bcca0abe4 100644
--- a/operator/src/main/java/org/bf2/operator/managers/StrimziManager.java
+++ b/operator/src/main/java/org/bf2/operator/managers/StrimziManager.java
@@ -34,6 +34,7 @@
@ApplicationScoped
public class StrimziManager {
+ public static final String STRIMZI_CLUSTER_OPERATOR = "strimzi-cluster-operator";
public static final String STRIMZI_PAUSE_RECONCILE_ANNOTATION = "strimzi.io/pause-reconciliation";
public static final String STRIMZI_PAUSE_REASON_ANNOTATION = "managedkafka.bf2.org/pause-reason";
@@ -110,7 +111,7 @@ private void updateStatus() {
}
private boolean isStrimziDeployment(Deployment deployment) {
- return deployment.getMetadata().getName().startsWith("strimzi-cluster-operator");
+ return deployment.getMetadata().getName().startsWith(STRIMZI_CLUSTER_OPERATOR);
}
});
}
diff --git a/operator/src/main/java/org/bf2/operator/operands/AdminServer.java b/operator/src/main/java/org/bf2/operator/operands/AdminServer.java
index 68752f7da..b98df5542 100644
--- a/operator/src/main/java/org/bf2/operator/operands/AdminServer.java
+++ b/operator/src/main/java/org/bf2/operator/operands/AdminServer.java
@@ -36,6 +36,7 @@
import org.bf2.common.OperandUtils;
import org.bf2.operator.managers.ImagePullSecretManager;
import org.bf2.operator.managers.IngressControllerManager;
+import org.bf2.operator.managers.OperandOverrideManager;
import org.bf2.operator.managers.SecuritySecretManager;
import org.bf2.operator.resources.v1alpha1.ManagedKafka;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaAuthenticationOAuth;
@@ -81,9 +82,6 @@ public class AdminServer extends AbstractAdminServer {
@Inject
Logger log;
- @ConfigProperty(name = "image.admin-api")
- String adminApiImage;
-
@ConfigProperty(name = "adminserver.cors.allowlist")
Optional corsAllowList;
@@ -101,6 +99,9 @@ public class AdminServer extends AbstractAdminServer {
@Inject
protected Instance ingressControllerManagerInstance;
+ @Inject
+ protected OperandOverrideManager overrideManager;
+
void onStart(@Observes StartupEvent ev) {
if (kubernetesClient.isAdaptable(OpenShiftClient.class)) {
openShiftClient = kubernetesClient.adapt(OpenShiftClient.class);
@@ -257,7 +258,7 @@ protected Route routeFrom(ManagedKafka managedKafka, Route current) {
protected List buildContainers(ManagedKafka managedKafka) {
Container container = new ContainerBuilder()
.withName("admin-server")
- .withImage(adminApiImage)
+ .withImage(overrideManager.getAdminServerImage(managedKafka.getSpec().getVersions().getStrimzi()))
.withEnv(buildEnvVar(managedKafka))
.withPorts(buildContainerPorts(managedKafka))
.withResources(buildResources())
diff --git a/operator/src/main/java/org/bf2/operator/operands/Canary.java b/operator/src/main/java/org/bf2/operator/operands/Canary.java
index 53b95c0e4..6b613c861 100644
--- a/operator/src/main/java/org/bf2/operator/operands/Canary.java
+++ b/operator/src/main/java/org/bf2/operator/operands/Canary.java
@@ -30,6 +30,7 @@
import org.bf2.common.OperandUtils;
import org.bf2.operator.managers.ImagePullSecretManager;
import org.bf2.operator.managers.IngressControllerManager;
+import org.bf2.operator.managers.OperandOverrideManager;
import org.bf2.operator.managers.SecuritySecretManager;
import org.bf2.operator.resources.v1alpha1.ManagedKafka;
import org.bf2.operator.resources.v1alpha1.ServiceAccount;
@@ -58,12 +59,6 @@ public class Canary extends AbstractCanary {
private static final String METRICS_PORT_NAME = "metrics";
private static final IntOrString METRICS_PORT_TARGET = new IntOrString(METRICS_PORT_NAME);
- @ConfigProperty(name = "image.canary")
- String canaryImage;
-
- @ConfigProperty(name = "image.canary-init")
- String canaryInitImage;
-
@ConfigProperty(name = "managedkafka.canary.producer-latency-buckets")
String producerLatencyBuckets;
@@ -91,6 +86,9 @@ public class Canary extends AbstractCanary {
@Inject
protected KafkaInstanceConfiguration config;
+ @Inject
+ protected OperandOverrideManager overrideManager;
+
@Override
public Deployment deploymentFrom(ManagedKafka managedKafka, Deployment current) {
String canaryName = canaryName(managedKafka);
@@ -192,7 +190,7 @@ private List buildVolumes(ManagedKafka managedKafka) {
protected Container buildInitContainer(ManagedKafka managedKafka, Deployment current) {
return new ContainerBuilder()
.withName("init")
- .withImage(canaryInitImage)
+ .withImage(overrideManager.getCanaryInitImage(managedKafka.getSpec().getVersions().getStrimzi()))
.withEnv(buildInitEnvVar(managedKafka))
.withResources(buildResources())
.withCommand("/opt/strimzi-canary-tool/canary-dns-init.sh")
@@ -208,7 +206,7 @@ protected boolean hasClusterSpecificBootstrapDomain(ManagedKafka managedKafka) {
protected List buildContainers(ManagedKafka managedKafka, Deployment current) {
Container container = new ContainerBuilder()
.withName("canary")
- .withImage(canaryImage)
+ .withImage(overrideManager.getCanaryImage(managedKafka.getSpec().getVersions().getStrimzi()))
.withEnv(buildEnvVar(managedKafka, current))
.withPorts(buildContainerPorts())
.withResources(buildResources())
diff --git a/operator/src/test/java/org/bf2/operator/managers/OperandOverrideManagerTest.java b/operator/src/test/java/org/bf2/operator/managers/OperandOverrideManagerTest.java
new file mode 100644
index 000000000..607f06d3f
--- /dev/null
+++ b/operator/src/test/java/org/bf2/operator/managers/OperandOverrideManagerTest.java
@@ -0,0 +1,62 @@
+package org.bf2.operator.managers;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.TestProfile;
+import io.quarkus.test.kubernetes.client.KubernetesServerTestResource;
+import org.bf2.operator.MockProfile;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import javax.inject.Inject;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+@QuarkusTestResource(KubernetesServerTestResource.class)
+@TestProfile(MockProfile.class)
+@QuarkusTest
+public class OperandOverrideManagerTest {
+
+ @Inject
+ KubernetesClient client;
+
+ @Inject
+ OperandOverrideManager overrideManager;
+
+ @AfterEach
+ public void cleanup() {
+ overrideManager.resetOverrides();
+ }
+
+ @Test
+ void testImageOverride() {
+ String versionString = "strimzi-cluster-operator-0.26-1";
+ String defaultVersion = overrideManager.getCanaryImage(versionString);
+
+ overrideManager.updateOverrides(new ConfigMapBuilder().withNewMetadata()
+ .withName(versionString)
+ .endMetadata()
+ .withData(Collections.singletonMap(OperandOverrideManager.OPERANDS_YAML,
+ "canary: \n"
+ + " image: something\n"
+ + " notused: value\n"
+ + " init: \n"
+ + " image: somethingelse\n"))
+ .build());
+
+ String override = overrideManager.getCanaryImage(versionString);
+
+ assertEquals("something", override);
+ assertNotEquals(defaultVersion, override);
+
+ String initOverride = overrideManager.getCanaryInitImage(versionString);
+
+ assertEquals("somethingelse", initOverride);
+ }
+
+}