diff --git a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java index 22100c8990..bf1435f06f 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java @@ -61,15 +61,30 @@ public static String kafkaComponentName(String clusterName) { } /** - * Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster of the given name. - * @param clusterName The {@code metadata.name} of the {@code Kafka} resource. - * @param podNum The number of the Kafka pod + * Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster not using {@code KafkaNodePool} resources. + * + * @param clusterName The {@code metadata.name} of the {@code Kafka} resource. + * @param podNum The ordinal number of the Kafka pod + * * @return The name of the corresponding Kafka {@code Pod}. */ public static String kafkaPodName(String clusterName, int podNum) { return kafkaComponentName(clusterName) + "-" + podNum; } + /** + * Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster using {@code KafkaNodePool} resources. + * + * @param clusterName The {@code metadata.name} of the {@code Kafka} resource + * @param nodePoolName The {@code metadata.name} of the {@code KafkaNodePool} resource + * @param podNum The ordinal number of the Kafka pod + * + * @return The name of the corresponding Kafka {@code Pod}. + */ + public static String kafkaPodName(String clusterName, String nodePoolName, int podNum) { + return clusterName + "-" + nodePoolName + "-" + podNum; + } + /** * Returns the name of the internal bootstrap {@code Service} for a {@code Kafka} cluster of the given name. * @param clusterName The {@code metadata.name} of the {@code Kafka} resource. diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockTest.java index 8ecc314a00..89afc07ab2 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockTest.java @@ -17,6 +17,9 @@ import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; +import io.strimzi.api.kafka.model.nodepool.KafkaNodePool; +import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder; +import io.strimzi.api.kafka.model.nodepool.ProcessRoles; import io.strimzi.operator.cluster.ClusterOperatorConfig; import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.PlatformFeaturesAvailability; @@ -49,6 +52,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -57,7 +61,8 @@ @ExtendWith(VertxExtension.class) public class JbodStorageMockTest { - private static final String NAME = "my-kafka"; + private static final String NAME = "my-cluster"; + private static final String NODE_POOL_NAME = "mixed"; private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); private static Vertx vertx; @@ -66,7 +71,7 @@ public class JbodStorageMockTest { private static MockKube3 mockKube; private String namespace = "test-jbod-storage"; - private Kafka kafka; + private KafkaNodePool kafkaNodePool; private KafkaAssemblyOperator operator; private StrimziPodSetController podSetController; @@ -116,41 +121,47 @@ public void beforeEach(TestInfo testInfo) { .withDeleteClaim(false) .withSize("100Gi").build()); - this.kafka = new KafkaBuilder() + Kafka kafka = new KafkaBuilder() .withNewMetadata() .withNamespace(namespace) .withName(NAME) + .withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled")) .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewJbodStorage() - .withVolumes(volumes) - .endJbodStorage() .endKafka() - .withNewZookeeper() - .withReplicas(1) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() .endSpec() .build(); - Crds.kafkaOperation(client).inNamespace(namespace).resource(kafka).create(); + this.kafkaNodePool = new KafkaNodePoolBuilder() + .withNewMetadata() + .withName(NODE_POOL_NAME) + .withNamespace(namespace) + .withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, NAME)) + .withGeneration(1L) + .endMetadata() + .withNewSpec() + .withReplicas(3) + .withNewJbodStorage() + .withVolumes(volumes) + .endJbodStorage() + .withRoles(ProcessRoles.CONTROLLER, ProcessRoles.BROKER) + .endSpec() + .build(); + Crds.kafkaNodePoolOperation(client).inNamespace(namespace).resource(kafkaNodePool).create(); + PlatformFeaturesAvailability pfa = new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION); // creating the Kafka operator ResourceOperatorSupplier ros = - new ResourceOperatorSupplier(JbodStorageMockTest.vertx, client, - ResourceUtils.zookeeperLeaderFinder(JbodStorageMockTest.vertx, client), - ResourceUtils.adminClientProvider(), ResourceUtils.zookeeperScalerProvider(), ResourceUtils.kafkaAgentClientProvider(), - ResourceUtils.metricsProvider(), ResourceUtils.zooKeeperAdminProvider(), pfa, 60_000L); + new ResourceOperatorSupplier(vertx, client, null, ResourceUtils.adminClientProvider(), null, + ResourceUtils.kafkaAgentClientProvider(), ResourceUtils.metricsProvider(), null, pfa, 60_000L); podSetController = new StrimziPodSetController(namespace, Labels.EMPTY, ros.kafkaOperator, ros.connectOperator, ros.mirrorMaker2Operator, ros.strimziPodSetOperator, ros.podOperations, ros.metricsProvider, Integer.parseInt(ClusterOperatorConfig.POD_SET_CONTROLLER_WORK_QUEUE_SIZE.defaultValue())); podSetController.start(); @@ -172,11 +183,10 @@ public void testJbodStorageCreatesPVCsMatchingKafkaVolumes(VertxTestContext cont .onComplete(context.succeeding(v -> context.verify(() -> { List pvcs = getPvcs(); - for (int i = 0; i < this.kafka.getSpec().getKafka().getReplicas(); i++) { + for (int i = 0; i < this.kafkaNodePool.getSpec().getReplicas(); i++) { for (SingleVolumeStorage volume : this.volumes) { if (volume instanceof PersistentClaimStorage) { - - String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, i); + String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, NODE_POOL_NAME, i); List matchingPvcs = pvcs.stream() .filter(pvc -> pvc.getMetadata().getName().equals(expectedPvcName)) .collect(Collectors.toList()); @@ -208,16 +218,14 @@ public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext contex .withDeleteClaim(false) .withSize("100Gi").build()); - Kafka kafkaWithNewJbodVolume = new KafkaBuilder(kafka) + KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool) .editSpec() - .editKafka() - .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) - .endKafka() + .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) .endSpec() .build(); - Set expectedPvcs = expectedPvcs(kafka); - Set expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaWithNewJbodVolume); + Set expectedPvcs = expectedPvcs(kafkaNodePool); + Set expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume); // reconcile for kafka cluster creation operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) @@ -227,7 +235,7 @@ public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext contex assertThat(pvcsNames, is(expectedPvcs)); }))) .compose(v -> { - Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithNewJbodVolume); + Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume); // reconcile kafka cluster with new Jbod storage return operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)); }) @@ -248,16 +256,14 @@ public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext conte // remove a volume from the Jbod Storage volumes.remove(0); - Kafka kafkaWithRemovedJbodVolume = new KafkaBuilder(this.kafka) + KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool) .editSpec() - .editKafka() - .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) - .endKafka() + .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) .endSpec() .build(); - Set expectedPvcs = expectedPvcs(kafka); - Set expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaWithRemovedJbodVolume); + Set expectedPvcs = expectedPvcs(kafkaNodePool); + Set expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume); // reconcile for kafka cluster creation operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) @@ -267,7 +273,7 @@ public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext conte assertThat(pvcsNames, is(expectedPvcs)); }))) .compose(v -> { - Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithRemovedJbodVolume); + Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume); // reconcile kafka cluster with a Jbod storage volume removed return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME)); }) @@ -286,16 +292,14 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) { // trying to update id for a volume from in the JBOD storage volumes.get(0).setId(3); - Kafka kafkaWithUpdatedJbodVolume = new KafkaBuilder(this.kafka) + KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool) .editSpec() - .editKafka() - .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) - .endKafka() + .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) .endSpec() .build(); - Set expectedPvcs = expectedPvcs(kafka); - Set expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaWithUpdatedJbodVolume); + Set expectedPvcs = expectedPvcs(kafkaNodePool); + Set expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume); // reconcile for kafka cluster creation operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) @@ -305,7 +309,7 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) { assertThat(pvcsNames, is(expectedPvcs)); }))) .compose(v -> { - Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithUpdatedJbodVolume); + Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume); // reconcile kafka cluster with a Jbod storage volume removed return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME)); }) @@ -317,13 +321,12 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) { }))); } - private Set expectedPvcs(Kafka kafka) { + private Set expectedPvcs(KafkaNodePool nodePool) { Set expectedPvcs = new HashSet<>(); - for (int i = 0; i < kafka.getSpec().getKafka().getReplicas(); i++) { - for (SingleVolumeStorage volume : ((JbodStorage) kafka.getSpec().getKafka().getStorage()).getVolumes()) { + for (int i = 0; i < nodePool.getSpec().getReplicas(); i++) { + for (SingleVolumeStorage volume : ((JbodStorage) nodePool.getSpec().getStorage()).getVolumes()) { if (volume instanceof PersistentClaimStorage) { - expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-" - + KafkaResources.kafkaPodName(NAME, i)); + expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-" + KafkaResources.kafkaPodName(NAME, NODE_POOL_NAME, i)); } } } @@ -331,8 +334,7 @@ private Set expectedPvcs(Kafka kafka) { } private List getPvcs() { - String kafkaStsName = KafkaResources.kafkaComponentName(JbodStorageMockTest.NAME); - Labels pvcSelector = Labels.forStrimziCluster(JbodStorageMockTest.NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(kafkaStsName); + Labels pvcSelector = Labels.forStrimziCluster(NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(KafkaResources.kafkaComponentName(NAME)); return client.persistentVolumeClaims() .inNamespace(namespace) .withLabels(pvcSelector.toMap()) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockZooBasedTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockZooBasedTest.java new file mode 100644 index 0000000000..43ce040cfc --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/JbodStorageMockZooBasedTest.java @@ -0,0 +1,341 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.assembly; + +import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.strimzi.api.kafka.Crds; +import io.strimzi.api.kafka.model.kafka.JbodStorage; +import io.strimzi.api.kafka.model.kafka.JbodStorageBuilder; +import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.api.kafka.model.kafka.KafkaResources; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; +import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; +import io.strimzi.operator.cluster.ClusterOperatorConfig; +import io.strimzi.operator.cluster.KafkaVersionTestUtils; +import io.strimzi.operator.cluster.PlatformFeaturesAvailability; +import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.cluster.model.KafkaVersion; +import io.strimzi.operator.cluster.model.VolumeUtils; +import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; +import io.strimzi.operator.common.Annotations; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.model.Labels; +import io.strimzi.operator.common.model.PasswordGenerator; +import io.strimzi.operator.common.operator.MockCertManager; +import io.strimzi.platform.KubernetesVersion; +import io.strimzi.test.mockkube3.MockKube3; +import io.vertx.core.Vertx; +import io.vertx.core.WorkerExecutor; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@ExtendWith(VertxExtension.class) +public class JbodStorageMockZooBasedTest { + private static final String NAME = "my-kafka"; + private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); + + private static Vertx vertx; + private static WorkerExecutor sharedWorkerExecutor; + private static KubernetesClient client; + private static MockKube3 mockKube; + + private String namespace = "test-jbod-storage"; + private Kafka kafka; + private KafkaAssemblyOperator operator; + private StrimziPodSetController podSetController; + + private List volumes; + + @BeforeAll + public static void beforeAll() { + // Configure the Kubernetes Mock + mockKube = new MockKube3.MockKube3Builder() + .withKafkaNodePoolCrd() + .withKafkaCrd() + .withKafkaConnectCrd() + .withKafkaMirrorMaker2Crd() + .withStrimziPodSetCrd() + .withDeploymentController() + .withPodController() + .withServiceController() + .withDeletionController() + .build(); + mockKube.start(); + client = mockKube.client(); + + vertx = Vertx.vertx(); + sharedWorkerExecutor = vertx.createSharedWorkerExecutor("kubernetes-ops-pool"); + } + + @AfterAll + public static void afterAll() { + mockKube.stop(); + sharedWorkerExecutor.close(); + vertx.close(); + } + + @BeforeEach + public void beforeEach(TestInfo testInfo) { + namespace = testInfo.getTestMethod().orElseThrow().getName().toLowerCase(Locale.ROOT); + mockKube.prepareNamespace(namespace); + + this.volumes = new ArrayList<>(2); + + volumes.add(new PersistentClaimStorageBuilder() + .withId(0) + .withDeleteClaim(true) + .withSize("100Gi").build()); + volumes.add(new PersistentClaimStorageBuilder() + .withId(1) + .withDeleteClaim(false) + .withSize("100Gi").build()); + + this.kafka = new KafkaBuilder() + .withNewMetadata() + .withNamespace(namespace) + .withName(NAME) + .endMetadata() + .withNewSpec() + .withNewKafka() + .withReplicas(3) + .withListeners(new GenericKafkaListenerBuilder() + .withName("plain") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(false) + .build()) + .withNewJbodStorage() + .withVolumes(volumes) + .endJbodStorage() + .endKafka() + .withNewZookeeper() + .withReplicas(1) + .withNewEphemeralStorage() + .endEphemeralStorage() + .endZookeeper() + .endSpec() + .build(); + + Crds.kafkaOperation(client).inNamespace(namespace).resource(kafka).create(); + + PlatformFeaturesAvailability pfa = new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION); + // creating the Kafka operator + ResourceOperatorSupplier ros = + new ResourceOperatorSupplier(JbodStorageMockZooBasedTest.vertx, client, + ResourceUtils.zookeeperLeaderFinder(JbodStorageMockZooBasedTest.vertx, client), + ResourceUtils.adminClientProvider(), ResourceUtils.zookeeperScalerProvider(), ResourceUtils.kafkaAgentClientProvider(), + ResourceUtils.metricsProvider(), ResourceUtils.zooKeeperAdminProvider(), pfa, 60_000L); + + podSetController = new StrimziPodSetController(namespace, Labels.EMPTY, ros.kafkaOperator, ros.connectOperator, ros.mirrorMaker2Operator, ros.strimziPodSetOperator, ros.podOperations, ros.metricsProvider, Integer.parseInt(ClusterOperatorConfig.POD_SET_CONTROLLER_WORK_QUEUE_SIZE.defaultValue())); + podSetController.start(); + + this.operator = new KafkaAssemblyOperator(JbodStorageMockZooBasedTest.vertx, pfa, new MockCertManager(), + new PasswordGenerator(10, "a", "a"), ros, + ResourceUtils.dummyClusterOperatorConfig(VERSIONS)); + } + + @AfterEach + public void afterEach() { + podSetController.stop(); + } + + @Test + public void testJbodStorageCreatesPVCsMatchingKafkaVolumes(VertxTestContext context) { + Checkpoint async = context.checkpoint(); + operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + + for (int i = 0; i < this.kafka.getSpec().getKafka().getReplicas(); i++) { + for (SingleVolumeStorage volume : this.volumes) { + if (volume instanceof PersistentClaimStorage) { + + String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, i); + List matchingPvcs = pvcs.stream() + .filter(pvc -> pvc.getMetadata().getName().equals(expectedPvcName)) + .collect(Collectors.toList()); + assertThat("Exactly one pvc should have the name " + expectedPvcName + " in :\n" + pvcs, + matchingPvcs, Matchers.hasSize(1)); + + PersistentVolumeClaim pvc = matchingPvcs.get(0); + boolean isDeleteClaim = ((PersistentClaimStorage) volume).isDeleteClaim(); + assertThat("deleteClaim value did not match for volume : " + volume, + Annotations.booleanAnnotation(pvc, Annotations.ANNO_STRIMZI_IO_DELETE_CLAIM, + false), + is(isDeleteClaim)); + + } + } + } + + async.flag(); + }))); + } + + @Test + public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext context) { + Checkpoint async = context.checkpoint(); + + // Add a new volume to Jbod Storage + volumes.add(new PersistentClaimStorageBuilder() + .withId(2) + .withDeleteClaim(false) + .withSize("100Gi").build()); + + Kafka kafkaWithNewJbodVolume = new KafkaBuilder(kafka) + .editSpec() + .editKafka() + .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) + .endKafka() + .endSpec() + .build(); + + Set expectedPvcs = expectedPvcs(kafka); + Set expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaWithNewJbodVolume); + + // reconcile for kafka cluster creation + operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + Set pvcsNames = pvcs.stream().map(pvc -> pvc.getMetadata().getName()).collect(Collectors.toSet()); + assertThat(pvcsNames, is(expectedPvcs)); + }))) + .compose(v -> { + Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithNewJbodVolume); + // reconcile kafka cluster with new Jbod storage + return operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)); + }) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + Set pvcsNames = pvcs.stream().map(pvc -> pvc.getMetadata().getName()).collect(Collectors.toSet()); + assertThat(pvcsNames, is(expectedPvcsWithNewJbodStorageVolume)); + async.flag(); + }))); + + + } + + @Test + public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext context) { + Checkpoint async = context.checkpoint(); + + // remove a volume from the Jbod Storage + volumes.remove(0); + + Kafka kafkaWithRemovedJbodVolume = new KafkaBuilder(this.kafka) + .editSpec() + .editKafka() + .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) + .endKafka() + .endSpec() + .build(); + + Set expectedPvcs = expectedPvcs(kafka); + Set expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaWithRemovedJbodVolume); + + // reconcile for kafka cluster creation + operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + Set pvcsNames = pvcs.stream().map(pvc -> pvc.getMetadata().getName()).collect(Collectors.toSet()); + assertThat(pvcsNames, is(expectedPvcs)); + }))) + .compose(v -> { + Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithRemovedJbodVolume); + // reconcile kafka cluster with a Jbod storage volume removed + return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME)); + }) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + Set pvcsNames = pvcs.stream().map(pvc -> pvc.getMetadata().getName()).collect(Collectors.toSet()); + assertThat(pvcsNames, is(expectedPvcsWithRemovedJbodStorageVolume)); + async.flag(); + }))); + } + + @Test + public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) { + Checkpoint async = context.checkpoint(); + + // trying to update id for a volume from in the JBOD storage + volumes.get(0).setId(3); + + Kafka kafkaWithUpdatedJbodVolume = new KafkaBuilder(this.kafka) + .editSpec() + .editKafka() + .withStorage(new JbodStorageBuilder().withVolumes(volumes).build()) + .endKafka() + .endSpec() + .build(); + + Set expectedPvcs = expectedPvcs(kafka); + Set expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaWithUpdatedJbodVolume); + + // reconcile for kafka cluster creation + operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + Set pvcsNames = pvcs.stream().map(pvc -> pvc.getMetadata().getName()).collect(Collectors.toSet()); + assertThat(pvcsNames, is(expectedPvcs)); + }))) + .compose(v -> { + Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithUpdatedJbodVolume); + // reconcile kafka cluster with a Jbod storage volume removed + return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME)); + }) + .onComplete(context.succeeding(v -> context.verify(() -> { + List pvcs = getPvcs(); + Set pvcsNames = pvcs.stream().map(pvc -> pvc.getMetadata().getName()).collect(Collectors.toSet()); + assertThat(pvcsNames, is(expectedPvcsWithUpdatedJbodStorageVolume)); + async.flag(); + }))); + } + + private Set expectedPvcs(Kafka kafka) { + Set expectedPvcs = new HashSet<>(); + for (int i = 0; i < kafka.getSpec().getKafka().getReplicas(); i++) { + for (SingleVolumeStorage volume : ((JbodStorage) kafka.getSpec().getKafka().getStorage()).getVolumes()) { + if (volume instanceof PersistentClaimStorage) { + expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-" + + KafkaResources.kafkaPodName(NAME, i)); + } + } + } + return expectedPvcs; + } + + private List getPvcs() { + String kafkaStsName = KafkaResources.kafkaComponentName(JbodStorageMockZooBasedTest.NAME); + Labels pvcSelector = Labels.forStrimziCluster(JbodStorageMockZooBasedTest.NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(kafkaStsName); + return client.persistentVolumeClaims() + .inNamespace(namespace) + .withLabels(pvcSelector.toMap()) + .list().getItems(); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaExporterReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaExporterReconcilerTest.java index 1e72054db5..c762361957 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaExporterReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaExporterReconcilerTest.java @@ -11,6 +11,8 @@ import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.exporter.KafkaExporterResources; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; import io.strimzi.operator.cluster.ClusterOperatorConfig; import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.ResourceUtils; @@ -37,6 +39,7 @@ import java.time.Clock; import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -63,6 +66,29 @@ public class KafkaExporterReconcilerTest { ResourceUtils.createInitialCaCertSecret(NAMESPACE, NAME, AbstractModel.clusterCaCertSecretName(NAME), MockCertManager.clusterCaCert(), MockCertManager.clusterCaCertStore(), "123456"), ResourceUtils.createInitialCaKeySecret(NAMESPACE, NAME, AbstractModel.clusterCaKeySecretName(NAME), MockCertManager.clusterCaKey()) ); + private final static Kafka KAFKA = new KafkaBuilder() + .withNewMetadata() + .withNamespace(NAMESPACE) + .withName(NAME) + .withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled")) + .endMetadata() + .withNewSpec() + .withNewKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("plain") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(false) + .build(), + new GenericKafkaListenerBuilder() + .withName("tls") + .withPort(9093) + .withType(KafkaListenerType.INTERNAL) + .withTls(true) + .build()) + .endKafka() + .endSpec() + .build(); /* * Tests Kafka Exporter reconciliation when Kafka Exporter is enabled. In such case, the KE Deployment and all other @@ -91,7 +117,7 @@ public void reconcileWithEnabledExporter(VertxTestContext context) { when(mockDepOps.waitForObserved(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); when(mockDepOps.readiness(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); - Kafka kafka = new KafkaBuilder(ResourceUtils.createKafka(NAMESPACE, NAME, 3, "foo", 120, 30)) + Kafka kafka = new KafkaBuilder(KAFKA) .editSpec() .withNewKafkaExporter() .endKafkaExporter() @@ -161,7 +187,7 @@ public void reconcileWithEnabledExporterWithoutNetworkPolicies(VertxTestContext when(mockDepOps.waitForObserved(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); when(mockDepOps.readiness(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); - Kafka kafka = new KafkaBuilder(ResourceUtils.createKafka(NAMESPACE, NAME, 3, "foo", 120, 30)) + Kafka kafka = new KafkaBuilder(KAFKA) .editSpec() .withNewKafkaExporter() .endKafkaExporter() @@ -222,13 +248,11 @@ public void reconcileWithDisabledExporter(VertxTestContext context) { when(mockDepOps.waitForObserved(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); when(mockDepOps.readiness(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); - Kafka kafka = ResourceUtils.createKafka(NAMESPACE, NAME, 3, "foo", 120, 30); - KafkaExporterReconciler reconciler = new KafkaExporterReconciler( Reconciliation.DUMMY_RECONCILIATION, ResourceUtils.dummyClusterOperatorConfig(), supplier, - kafka, + KAFKA, VERSIONS, CLUSTER_CA ); @@ -278,14 +302,12 @@ public void reconcileWithDisabledExporterWithoutNetworkPolicies(VertxTestContext when(mockDepOps.waitForObserved(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); when(mockDepOps.readiness(any(), eq(NAMESPACE), eq(KafkaExporterResources.componentName(NAME)), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); - Kafka kafka = ResourceUtils.createKafka(NAMESPACE, NAME, 3, "foo", 120, 30); - KafkaExporterReconciler reconciler = new KafkaExporterReconciler( Reconciliation.DUMMY_RECONCILIATION, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()) .with(ClusterOperatorConfig.NETWORK_POLICY_GENERATION.key(), "false").build(), supplier, - kafka, + KAFKA, VERSIONS, CLUSTER_CA ); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/ReconnectingWatcherMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/ReconnectingWatcherMockTest.java index 5826f6ca45..592a5807c5 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/ReconnectingWatcherMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/ReconnectingWatcherMockTest.java @@ -104,15 +104,12 @@ public void testWatch() throws InterruptedException { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(1) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() .endSpec() .build(); @@ -171,15 +168,12 @@ public void testWatchAllNamespaces() throws InterruptedException { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(1) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() .endSpec() .build(); @@ -241,15 +235,12 @@ public void testWatchWithSelector() throws InterruptedException { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(1) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() .endSpec() .build(); @@ -261,15 +252,12 @@ public void testWatchWithSelector() throws InterruptedException { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(1) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() .endSpec() .build(); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerIT.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerIT.java index a9841929b6..cc95100377 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerIT.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerIT.java @@ -162,21 +162,13 @@ private static Kafka kafka(String name, Map labels) { .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() .endSpec() .build(); } @@ -470,7 +462,7 @@ public void testPodUpdates(VertxTestContext context) { }, () -> context.failNow("Pod stats do not match")); - // Get resource version to double check the pod was not deleted + // Get resource version to double-check the pod was not deleted Pod initialPod = client.pods().inNamespace(NAMESPACE).withName(podName).get(); String resourceVersion = initialPod.getMetadata().getResourceVersion(); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerMockTest.java index 197e15c249..9784db432a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetControllerMockTest.java @@ -161,21 +161,13 @@ private static Kafka kafka(String namespace, String name, Map la .endMetadata() .withNewSpec() .withNewKafka() - .withReplicas(3) .withListeners(new GenericKafkaListenerBuilder() .withName("plain") .withPort(9092) .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() .endSpec() .build(); }