From 593400f77aa1070090c13dbd904aceb9c10ed167 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Thu, 31 Oct 2024 16:09:42 +0100 Subject: [PATCH] Add test that uses s3 (Minio) as a state backend Signed-off-by: Jakub Stejskal --- .../streams/e2e/flink/sql/SqlJobRunnerST.md | 26 + .../io/streams/constants/TestConstants.java | 4 + .../io/streams/operands/minio/SetupMinio.java | 142 ++++ .../java/io/streams/utils/MinioUtils.java | 110 +++ .../streams/e2e/flink/sql/SqlJobRunnerST.java | 676 ++++++++++++++---- 5 files changed, 812 insertions(+), 146 deletions(-) create mode 100644 src/main/java/io/streams/operands/minio/SetupMinio.java create mode 100644 src/main/java/io/streams/utils/MinioUtils.java diff --git a/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md b/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md index cc6e1d1..cee5f94 100644 --- a/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md +++ b/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md @@ -62,6 +62,32 @@ * `flink` (description file doesn't exist) +## testS3StateBackend + +**Description:** Test verifies that user can use S3 as state backend + +**Steps:** + +| Step | Action | Result | +| - | - | - | +| 1. | Create namespace, serviceaccount and roles for Flink | Resources created | +| 2. | Deploy Apicurio registry | Apicurio registry is up and running | +| 3. | Deploy Kafka my-cluster with scram-sha auth | Kafka is up and running | +| 4. | Create KafkaUser with scram-sha secret | KafkaUser created | +| 5. | Deploy strimzi-kafka-clients producer with payment data generator | Client job is created and data are sent to flink.payment.data topic | +| 6. | Deploy Minio for S3 service | Minio is up and running | +| 7. | Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter payment of type paypal and send data to flink.payment.paypal topic, for authentication is used secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is configured to use S3 as a state backend | FlinkDeployment is up and tasks are deployed and it sends filtered data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses S3 | +| 8. | Deploy strimzi-kafka-clients consumer as job and consume messages fromkafka topic flink.payment.paypal | Consumer is deployed and it consumes messages | +| 9. | Verify that messages are present | Messages are present | +| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as EmbeddedRocksDBStateBackend' | Log message is present | +| 11. | Verify that Minio contains some data from Flink | Flink bucket is not empty | + +**Labels:** + +* `flink-sql-runner` (description file doesn't exist) +* `flink` (description file doesn't exist) + + ## testSimpleFilter **Description:** Test verifies sql-runner.jar works integrated with kafka, apicurio and uses scram-sha for kafka authentication diff --git a/src/main/java/io/streams/constants/TestConstants.java b/src/main/java/io/streams/constants/TestConstants.java index 590a4dd..0894a8a 100644 --- a/src/main/java/io/streams/constants/TestConstants.java +++ b/src/main/java/io/streams/constants/TestConstants.java @@ -20,4 +20,8 @@ public interface TestConstants { String STRIMZI_TEST_CLIENTS_LABEL_KEY = "strimzi-test-clients"; String STRIMZI_TEST_CLIENTS_LABEL_VALUE = "true"; + + // Labels + String APP_POD_LABEL = "app"; + String DEPLOYMENT_TYPE = "deployment-type"; } diff --git a/src/main/java/io/streams/operands/minio/SetupMinio.java b/src/main/java/io/streams/operands/minio/SetupMinio.java new file mode 100644 index 0000000..9637247 --- /dev/null +++ b/src/main/java/io/streams/operands/minio/SetupMinio.java @@ -0,0 +1,142 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.operands.minio; + +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import io.skodjob.testframe.resources.KubeResourceManager; +import io.streams.constants.TestConstants; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +public class SetupMinio { + private static final Logger LOGGER = LogManager.getLogger(SetupMinio.class); + + public static final String MINIO = "minio"; + public static final String ADMIN_CREDS = "minioadminLongerThan16BytesForFIPS"; + public static final String MINIO_STORAGE_ALIAS = "local"; + public static final int MINIO_PORT = 9000; + public static final int MINIO_CONSOLE_PORT = 9090; + private static final String MINIO_IMAGE = "quay.io/minio/minio:latest"; + + /** + * Deploy minio to a specific namespace, creates service for it and init client inside the Minio pod + * @param namespace where Minio will be installed to + */ + public static void deployMinio(String namespace) { + // Create a Minio deployment + Deployment minioDeployment = new DeploymentBuilder() + .withNewMetadata() + .withName(MINIO) + .withNamespace(namespace) + .withLabels(Map.of(TestConstants.DEPLOYMENT_TYPE, MINIO)) + .endMetadata() + .withNewSpec() + .withReplicas(1) + .withNewSelector() + .withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO)) + .endSelector() + .withNewTemplate() + .withNewMetadata() + .withLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO)) + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName(MINIO) + .withImage(MINIO_IMAGE) + .withArgs("server", "/data", "--console-address", ":" + MINIO_CONSOLE_PORT) + .addToEnv(new EnvVar("MINIO_ROOT_USER", ADMIN_CREDS, null)) + .addToEnv(new EnvVar("MINIO_ROOT_PASSWORD", ADMIN_CREDS, null)) + .addNewPort() + .withContainerPort(MINIO_PORT) + .endPort() + .withVolumeMounts(new VolumeMountBuilder() + .withName("minio-storage") + .withMountPath("/data") + .build()) + .endContainer() + .withVolumes(new VolumeBuilder() + .withName("minio-storage") + .withNewEmptyDir() + .endEmptyDir() + .build()) + .endSpec() + .endTemplate() + .endSpec() + .build(); + + // Create the deployment + KubeResourceManager.getInstance().createResourceWithWait(minioDeployment); + + // Create a service to expose Minio + Service minioService = new ServiceBuilder() + .withNewMetadata() + .withName(MINIO) + .withNamespace(namespace) + .endMetadata() + .withNewSpec() + .withSelector(Map.of(TestConstants.APP_POD_LABEL, MINIO)) + .addNewPort() + .withName("api") + .withPort(MINIO_PORT) + .withTargetPort(new IntOrString(MINIO_PORT)) + .endPort() + .addNewPort() + .withName("console") + .withPort(MINIO_CONSOLE_PORT) + .withTargetPort(new IntOrString(MINIO_CONSOLE_PORT)) + .endPort() + .endSpec() + .build(); + + KubeResourceManager.getInstance().createResourceWithoutWait(minioService); + // NetworkPolicyResource.allowNetworkPolicyAllIngressForMatchingLabel(namespace, MINIO, Map.of(TestConstants.APP_POD_LABEL, MINIO)); + + initMinioClient(namespace); + } + + /** + * Init client inside the Minio pod. This allows other commands to be executed during the tests. + * @param namespace where Minio is installed + */ + private static void initMinioClient(String namespace) { + final LabelSelector labelSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO)).build(); + final String minioPod = KubeResourceManager.getKubeClient().listPods(namespace, labelSelector).get(0).getMetadata().getName(); + + KubeResourceManager.getKubeCmdClient().inNamespace(namespace).execInPod(minioPod, + "mc", + "config", + "host", + "add", + MINIO_STORAGE_ALIAS, + "http://localhost:" + MINIO_PORT, + ADMIN_CREDS, ADMIN_CREDS); + } + + /** + * Create bucket in Minio instance in specific namespace. + * @param namespace Minio location + * @param bucketName name of the bucket that will be created and used within the tests + */ + public static void createBucket(String namespace, String bucketName) { + final LabelSelector labelSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO)).build(); + final String minioPod = KubeResourceManager.getKubeClient().listPods(namespace, labelSelector).get(0).getMetadata().getName(); + + KubeResourceManager.getKubeCmdClient().inNamespace(namespace).execInPod(minioPod, + "mc", + "mb", + MINIO_STORAGE_ALIAS + "/" + bucketName); + } +} diff --git a/src/main/java/io/streams/utils/MinioUtils.java b/src/main/java/io/streams/utils/MinioUtils.java new file mode 100644 index 0000000..54c92e7 --- /dev/null +++ b/src/main/java/io/streams/utils/MinioUtils.java @@ -0,0 +1,110 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.utils; + +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; +import io.skodjob.testframe.TestFrameConstants; +import io.skodjob.testframe.resources.KubeResourceManager; +import io.skodjob.testframe.wait.Wait; +import io.streams.constants.TestConstants; +import io.streams.operands.minio.SetupMinio; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MinioUtils { + private static final Logger LOGGER = LogManager.getLogger(MinioUtils.class); + + private MinioUtils() { + + } + + /** + * Collect data from Minio about usage of a specific bucket + * + * @param namespace Name of the Namespace where the Minio Pod is running + * @param bucketName Name of the bucket for which we want to get info about its size + * @return Overall statistics about the bucket in String format + */ + public static String getBucketSizeInfo(String namespace, String bucketName) { + final LabelSelector labelSelector = new LabelSelectorBuilder() + .withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, SetupMinio.MINIO)) + .build(); + final String minioPod = KubeResourceManager.getKubeClient() + .listPods(namespace, labelSelector) + .get(0) + .getMetadata() + .getName(); + + return KubeResourceManager.getKubeCmdClient() + .inNamespace(namespace) + .execInPod(minioPod, + "mc", + "stat", + "local/" + bucketName) + .out(); + + } + + /** + * Parse out total size of bucket from the information about usage. + * + * @param bucketInfo String containing all stat info about bucket + * @return Map consists of parsed size and it's unit + */ + private static Map parseTotalSize(String bucketInfo) { + Pattern pattern = Pattern.compile("Total size:\\s*(?[\\d.]+)\\s*(?.*)"); + Matcher matcher = pattern.matcher(bucketInfo); + + if (matcher.find()) { + return Map.of("size", Double.parseDouble(matcher.group("size")), "unit", matcher.group("unit")); + } else { + throw new IllegalArgumentException("Total size not found in the provided string"); + } + } + + /** + * Wait until size of the bucket is not 0 B. + * + * @param namespace Minio location + * @param bucketName bucket name + */ + public static void waitForDataInMinio(String namespace, String bucketName) { + Wait.until("data sync from Kafka to Minio", + TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM, + TestFrameConstants.GLOBAL_TIMEOUT, + () -> { + String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName); + Map parsedSize = parseTotalSize(bucketSizeInfo); + double bucketSize = (Double) parsedSize.get("size"); + LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit")); + LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo); + + return bucketSize > 0; + }); + } + + /** + * Wait until size of the bucket is 0 B. + * + * @param namespace Minio location + * @param bucketName bucket name + */ + public static void waitForNoDataInMinio(String namespace, String bucketName) { + Wait.until("data deletion in Minio", TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestFrameConstants.GLOBAL_TIMEOUT, () -> { + String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName); + Map parsedSize = parseTotalSize(bucketSizeInfo); + double bucketSize = (Double) parsedSize.get("size"); + LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit")); + LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo); + + return bucketSize == 0; + }); + } +} diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java index 7d247c7..824f2d1 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java @@ -24,6 +24,7 @@ import io.streams.operands.flink.resoruces.FlinkDeploymentType; import io.streams.operands.flink.templates.FlinkDeploymentTemplate; import io.streams.operands.flink.templates.FlinkRBAC; +import io.streams.operands.minio.SetupMinio; import io.streams.operands.strimzi.resources.KafkaType; import io.streams.operands.strimzi.templates.KafkaNodePoolTemplate; import io.streams.operands.strimzi.templates.KafkaTemplate; @@ -31,6 +32,7 @@ import io.streams.operators.EOperator; import io.streams.operators.OperatorInstaller; import io.streams.sql.TestStatements; +import io.streams.utils.MinioUtils; import io.streams.utils.StrimziClientUtils; import io.streams.utils.TestUtils; import io.streams.utils.kube.JobUtils; @@ -44,6 +46,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -108,68 +111,105 @@ void testSimpleFilter() { String namespace = "flink-filter"; String kafkaUser = "test-user"; // Create namespace - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata() + .withName(namespace) + .endMetadata() + .build()); // Add flink RBAC - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0])); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace) + .toArray(new HasMetadata[0])); // Create kafka - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", - 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)).build()); - - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaTemplate.defaultKafka(namespace, kafkaClusterName) - .editSpec() - .editKafka() - .withListeners( - new GenericKafkaListenerBuilder() - .withName("plain") - .withTls(false) - .withType(KafkaListenerType.INTERNAL) - .withPort((9092)) - .withAuth(new KafkaListenerAuthenticationScramSha512()) - .build(), - new GenericKafkaListenerBuilder() - .withName("unsecure") - .withTls(false) - .withType(KafkaListenerType.INTERNAL) - .withPort((9094)) - .build() - ) - .endKafka() - .endSpec() - .build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", + 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)) + .build()); + + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaTemplate.defaultKafka(namespace, kafkaClusterName) + .editSpec() + .editKafka() + .withListeners( + new GenericKafkaListenerBuilder() + .withName("plain") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9092)) + .withAuth(new KafkaListenerAuthenticationScramSha512()) + .build(), + new GenericKafkaListenerBuilder() + .withName("unsecure") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9094)) + .build() + ) + .endKafka() + .endSpec() + .build()); // Create topic for ksql apicurio - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); // Create kafka scram sha user - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) - .editSpec() - .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) - .endSpec() - .build()); - - String bootstrapServerAuth = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() - .getStatus().getListeners().stream().filter(l -> l.getName().equals("plain")) - .findFirst().get().getBootstrapServers(); - String bootstrapServerUnsecure = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() - .getStatus().getListeners().stream().filter(l -> l.getName().equals("unsecure")) - .findFirst().get().getBootstrapServers(); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) + .editSpec() + .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) + .endSpec() + .build()); + + String bootstrapServerAuth = KafkaType.kafkaClient() + .inNamespace(namespace) + .withName(kafkaClusterName) + .get() + .getStatus() + .getListeners() + .stream() + .filter(l -> l.getName() + .equals("plain")) + .findFirst() + .get() + .getBootstrapServers(); + String bootstrapServerUnsecure = KafkaType.kafkaClient() + .inNamespace(namespace) + .withName(kafkaClusterName) + .get() + .getStatus() + .getListeners() + .stream() + .filter(l -> l.getName() + .equals("unsecure")) + .findFirst() + .get() + .getBootstrapServers(); // Add apicurio - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - ApicurioRegistryTemplate.defaultApicurioRegistry("apicurio-registry", namespace, - bootstrapServerUnsecure).build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.defaultApicurioRegistry("apicurio-registry", namespace, + bootstrapServerUnsecure) + .build()); // Get user secret jaas configuration - final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient().getClient().secrets() - .inNamespace(namespace).withName(kafkaUser).get().getData().get("sasl.jaas.config"); + final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient() + .getClient() + .secrets() + .inNamespace(namespace) + .withName(kafkaUser) + .get() + .getData() + .get("sasl.jaas.config"); final String saslJaasConfigDecrypted = TestUtils.decodeFromBase64(saslJaasConfigEncrypted); // Run internal producer and produce data @@ -192,9 +232,10 @@ void testSimpleFilter() { ) .build(); - KubeResourceManager.getInstance().createResourceWithWait( - kafkaProducerClient.producerStrimzi() - ); + KubeResourceManager.getInstance() + .createResourceWithWait( + kafkaProducerClient.producerStrimzi() + ); String registryUrl = "http://apicurio-registry-service." + namespace + ".svc:8080/apis/ccompat/v6"; @@ -203,7 +244,8 @@ void testSimpleFilter() { "flink-filter", List.of(TestStatements.getTestFlinkFilter( bootstrapServerAuth, registryUrl, kafkaUser, namespace))) .build(); - KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flink); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait(flink); JobUtils.waitForJobSuccess(namespace, kafkaProducerClient.getProducerName(), TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); @@ -221,17 +263,23 @@ void testSimpleFilter() { "security.protocol=SASL_PLAINTEXT\n" + "sasl.jaas.config=" + saslJaasConfigDecrypted ) - .withConsumerGroup("flink-filter-test-group").build(); + .withConsumerGroup("flink-filter-test-group") + .build(); - KubeResourceManager.getInstance().createResourceWithWait( - kafkaConsumerClient.consumerStrimzi() - ); + KubeResourceManager.getInstance() + .createResourceWithWait( + kafkaConsumerClient.consumerStrimzi() + ); JobUtils.waitForJobSuccess(namespace, kafkaConsumerClient.getConsumerName(), TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); - String consumerPodName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, consumerName) - .get(0).getMetadata().getName(); - String log = KubeResourceManager.getKubeClient().getLogsFromPod(namespace, consumerPodName); + String consumerPodName = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, consumerName) + .get(0) + .getMetadata() + .getName(); + String log = KubeResourceManager.getKubeClient() + .getLogsFromPod(namespace, consumerPodName); assertTrue(log.contains("\"type\":\"paypal\"")); assertFalse(log.contains("\"type\":\"creditCard\"")); } @@ -257,34 +305,49 @@ void testBadSqlStatement() { String flinkDeploymentName = namespace; // Create namespace - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata() + .withName(namespace) + .endMetadata() + .build()); // Add flink RBAC - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0])); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace) + .toArray(new HasMetadata[0])); // Deploy flink with not valid sql FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, flinkDeploymentName, List.of("blah blah")) .build(); - KubeResourceManager.getInstance().createOrUpdateResourceWithoutWait(flink); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithoutWait(flink); // Check if no task is deployed and error is proper in flink deployment Wait.until("Flink deployment fail", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC, TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () -> { - String error = new FlinkDeploymentType().getClient().inNamespace(namespace).withName(flinkDeploymentName) - .get().getStatus().getError(); + String error = new FlinkDeploymentType().getClient() + .inNamespace(namespace) + .withName(flinkDeploymentName) + .get() + .getStatus() + .getError(); return error.contains("DeploymentFailedException") || error.contains("ReconciliationException"); }); - String podName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, flinkDeploymentName) - .get(0).getMetadata().getName(); + String podName = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, flinkDeploymentName) + .get(0) + .getMetadata() + .getName(); Wait.until("Flink deployment contains error message", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC, TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () -> KubeResourceManager.getKubeClient() - .getLogsFromPod(namespace, podName).contains("SQL parse failed")); + .getLogsFromPod(namespace, podName) + .contains("SQL parse failed")); } @TestDoc( @@ -308,25 +371,38 @@ void testWrongConnectionInfo() { String flinkDeploymentName = namespace; // Create namespace - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata() + .withName(namespace) + .endMetadata() + .build()); // Add flink RBAC - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0])); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace) + .toArray(new HasMetadata[0])); // Deploy flink with not valid sql FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, - flinkDeploymentName, List.of(TestStatements.getWrongConnectionSql())).build(); - KubeResourceManager.getInstance().createOrUpdateResourceWithoutWait(flink); + flinkDeploymentName, List.of(TestStatements.getWrongConnectionSql())) + .build(); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithoutWait(flink); // Check if no task is deployed and error is proper in flink deployment Wait.until("Flink deployment starts", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC, TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () -> - !KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, flinkDeploymentName).isEmpty()); + !KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, flinkDeploymentName) + .isEmpty()); - String podName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, flinkDeploymentName) - .get(0).getMetadata().getName(); + String podName = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, flinkDeploymentName) + .get(0) + .getMetadata() + .getName(); Wait.until("Flink deployment contains error message", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC, TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () -> @@ -367,70 +443,108 @@ void testWrongConnectionInfo() { @Test void testFRocksDbStateBackend() { String namespace = "flink-state-backend"; + String flinkDeploymentName = "flink-state-backend"; String kafkaUser = "test-user"; // Create namespace - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata() + .withName(namespace) + .endMetadata() + .build()); // Add flink RBAC - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0])); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace) + .toArray(new HasMetadata[0])); // Create kafka - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", - 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)).build()); - - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaTemplate.defaultKafka(namespace, kafkaClusterName) - .editSpec() - .editKafka() - .withListeners( - new GenericKafkaListenerBuilder() - .withName("plain") - .withTls(false) - .withType(KafkaListenerType.INTERNAL) - .withPort((9092)) - .withAuth(new KafkaListenerAuthenticationScramSha512()) - .build(), - new GenericKafkaListenerBuilder() - .withName("unsecure") - .withTls(false) - .withType(KafkaListenerType.INTERNAL) - .withPort((9094)) - .build() - ) - .endKafka() - .endSpec() - .build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", + 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)) + .build()); + + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaTemplate.defaultKafka(namespace, kafkaClusterName) + .editSpec() + .editKafka() + .withListeners( + new GenericKafkaListenerBuilder() + .withName("plain") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9092)) + .withAuth(new KafkaListenerAuthenticationScramSha512()) + .build(), + new GenericKafkaListenerBuilder() + .withName("unsecure") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9094)) + .build() + ) + .endKafka() + .endSpec() + .build()); // Create topic for ksql apicurio - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); // Create kafka scram sha user - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) - .editSpec() - .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) - .endSpec() - .build()); - - String bootstrapServerAuth = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() - .getStatus().getListeners().stream().filter(l -> l.getName().equals("plain")) - .findFirst().get().getBootstrapServers(); - String bootstrapServerUnsecure = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() - .getStatus().getListeners().stream().filter(l -> l.getName().equals("unsecure")) - .findFirst().get().getBootstrapServers(); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) + .editSpec() + .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) + .endSpec() + .build()); + + String bootstrapServerAuth = KafkaType.kafkaClient() + .inNamespace(namespace) + .withName(kafkaClusterName) + .get() + .getStatus() + .getListeners() + .stream() + .filter(l -> l.getName() + .equals("plain")) + .findFirst() + .get() + .getBootstrapServers(); + String bootstrapServerUnsecure = KafkaType.kafkaClient() + .inNamespace(namespace) + .withName(kafkaClusterName) + .get() + .getStatus() + .getListeners() + .stream() + .filter(l -> l.getName() + .equals("unsecure")) + .findFirst() + .get() + .getBootstrapServers(); // Add apicurio - KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - ApicurioRegistryTemplate.defaultApicurioRegistry("apicurio-registry", namespace, - bootstrapServerUnsecure).build()); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.defaultApicurioRegistry("apicurio-registry", namespace, + bootstrapServerUnsecure) + .build()); // Get user secret jaas configuration - final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient().getClient().secrets() - .inNamespace(namespace).withName(kafkaUser).get().getData().get("sasl.jaas.config"); + final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient() + .getClient() + .secrets() + .inNamespace(namespace) + .withName(kafkaUser) + .get() + .getData() + .get("sasl.jaas.config"); final String saslJaasConfigDecrypted = TestUtils.decodeFromBase64(saslJaasConfigEncrypted); // Run internal producer and produce data @@ -453,9 +567,10 @@ void testFRocksDbStateBackend() { ) .build(); - KubeResourceManager.getInstance().createResourceWithWait( - kafkaProducerClient.producerStrimzi() - ); + KubeResourceManager.getInstance() + .createResourceWithWait( + kafkaProducerClient.producerStrimzi() + ); String registryUrl = "http://apicurio-registry-service." + namespace + ".svc:8080/apis/ccompat/v6"; @@ -463,12 +578,13 @@ void testFRocksDbStateBackend() { PersistentVolumeClaim flinkPVC = FlinkDeploymentTemplate .getFlinkPVC(namespace, "flink-state-backend") .build(); - KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flinkPVC); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait(flinkPVC); // Deploy flink with test filter sql statement which filter to specific topic only payment type paypal // Modify flink default deployment with state backend and pvc configuration FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, - "flink-state-backend", List.of(TestStatements.getTestFlinkFilter( + flinkDeploymentName, List.of(TestStatements.getTestFlinkFilter( bootstrapServerAuth, registryUrl, kafkaUser, namespace))) .editSpec() .addToFlinkConfiguration( @@ -493,14 +609,272 @@ void testFRocksDbStateBackend() { .addNewVolume() .withName("flink-state-store") .withNewPersistentVolumeClaim() - .withClaimName(flinkPVC.getMetadata().getName()) + .withClaimName(flinkPVC.getMetadata() + .getName()) .endFlinkdeploymentspecPersistentVolumeClaim() .endFlinkdeploymentspecVolume() .endFlinkdeploymentspecSpec() .endPodTemplate() .endSpec() .build(); - KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flink); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait(flink); + + JobUtils.waitForJobSuccess(namespace, kafkaProducerClient.getProducerName(), + TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); + + //Check task manager log for presence rocksbd configuration + Wait.until("Task manager contains info about rocksdb", TestFrameConstants.GLOBAL_POLL_INTERVAL_LONG, + TestFrameConstants.GLOBAL_TIMEOUT, () -> { + List taskManagerPods = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, flinkDeploymentName + "-taskmanager"); + for (Pod p : taskManagerPods) { + return KubeResourceManager.getKubeClient() + .getLogsFromPod(namespace, p.getMetadata() + .getName()) + .contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend"); + } + return false; + }); + + // Run consumer and check if data are filtered + String consumerName = "kafka-consumer"; + StrimziKafkaClients kafkaConsumerClient = new StrimziKafkaClientsBuilder() + .withConsumerName(consumerName) + .withNamespaceName(namespace) + .withTopicName("flink.payment.paypal") + .withBootstrapAddress(bootstrapServerAuth) + .withMessageCount(10) + .withAdditionalConfig( + "sasl.mechanism=SCRAM-SHA-512\n" + + "security.protocol=SASL_PLAINTEXT\n" + + "sasl.jaas.config=" + saslJaasConfigDecrypted + ) + .withConsumerGroup("flink-filter-test-group") + .build(); + + KubeResourceManager.getInstance() + .createResourceWithWait( + kafkaConsumerClient.consumerStrimzi() + ); + + JobUtils.waitForJobSuccess(namespace, kafkaConsumerClient.getConsumerName(), + TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); + String consumerPodName = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, consumerName) + .get(0) + .getMetadata() + .getName(); + String log = KubeResourceManager.getKubeClient() + .getLogsFromPod(namespace, consumerPodName); + assertTrue(log.contains("\"type\":\"paypal\"")); + assertFalse(log.contains("\"type\":\"creditCard\"")); + } + + @TestDoc( + description = @Desc("Test verifies that user can use S3 as state backend"), + steps = { + @Step(value = "Create namespace, serviceaccount and roles for Flink", expected = "Resources created"), + @Step(value = "Deploy Apicurio registry", expected = "Apicurio registry is up and running"), + @Step(value = "Deploy Kafka my-cluster with scram-sha auth", expected = "Kafka is up and running"), + @Step(value = "Create KafkaUser with scram-sha secret", expected = "KafkaUser created"), + @Step(value = "Deploy strimzi-kafka-clients producer with payment data generator", + expected = "Client job is created and data are sent to flink.payment.data topic"), + @Step(value = "Deploy Minio for S3 service", expected = "Minio is up and running"), + @Step(value = "Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter " + + "payment of type paypal and send data to flink.payment.paypal topic, for authentication is used " + + "secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is " + + "configured to use S3 as a state backend", + expected = "FlinkDeployment is up and tasks are deployed and it sends filtered " + + "data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses " + + "S3"), + @Step(value = "Deploy strimzi-kafka-clients consumer as job and consume messages from" + + "kafka topic flink.payment.paypal", + expected = "Consumer is deployed and it consumes messages"), + @Step(value = "Verify that messages are present", expected = "Messages are present"), + @Step(value = "Verify that taskmanager logs contains 'State backend loader loads the state " + + "backend as EmbeddedRocksDBStateBackend'", expected = "Log message is present"), + @Step(value = "Verify that Minio contains some data from Flink", expected = "Flink bucket is not empty") + }, + labels = { + @Label(value = FLINK_SQL_RUNNER), + @Label(value = FLINK), + } + ) + @Test + void testS3StateBackend() { + String namespace = "flink-s3-state-backend"; + String flinkDeploymentName = "flink-state-backend"; + String kafkaUser = "test-user"; + String bucketName = "flink-bucket"; + + // Create namespace + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata() + .withName(namespace) + .endMetadata() + .build()); + + // Add flink RBAC + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace) + .toArray(new HasMetadata[0])); + + // Create kafka + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", + 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)) + .build()); + + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaTemplate.defaultKafka(namespace, kafkaClusterName) + .editSpec() + .editKafka() + .withListeners( + new GenericKafkaListenerBuilder() + .withName("plain") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9092)) + .withAuth(new KafkaListenerAuthenticationScramSha512()) + .build(), + new GenericKafkaListenerBuilder() + .withName("unsecure") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9094)) + .build() + ) + .endKafka() + .endSpec() + .build()); + + // Create topic for ksql apicurio + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); + + // Create kafka scram sha user + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) + .editSpec() + .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) + .endSpec() + .build()); + + String bootstrapServerAuth = KafkaType.kafkaClient() + .inNamespace(namespace) + .withName(kafkaClusterName) + .get() + .getStatus() + .getListeners() + .stream() + .filter(l -> l.getName() + .equals("plain")) + .findFirst() + .get() + .getBootstrapServers(); + String bootstrapServerUnsecure = KafkaType.kafkaClient() + .inNamespace(namespace) + .withName(kafkaClusterName) + .get() + .getStatus() + .getListeners() + .stream() + .filter(l -> l.getName() + .equals("unsecure")) + .findFirst() + .get() + .getBootstrapServers(); + + // Add apicurio + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.defaultApicurioRegistry("apicurio-registry", namespace, + bootstrapServerUnsecure) + .build()); + + // Get user secret jaas configuration + final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient() + .getClient() + .secrets() + .inNamespace(namespace) + .withName(kafkaUser) + .get() + .getData() + .get("sasl.jaas.config"); + final String saslJaasConfigDecrypted = TestUtils.decodeFromBase64(saslJaasConfigEncrypted); + + // Run internal producer and produce data + String producerName = "kafka-producer"; + StrimziKafkaClients kafkaProducerClient = new StrimziKafkaClientsBuilder() + .withProducerName(producerName) + .withNamespaceName(namespace) + .withTopicName("flink.payment.data") + .withBootstrapAddress(bootstrapServerAuth) + .withMessageCount(10000) + .withUsername(kafkaUser) + .withDelayMs(10) + .withMessageTemplate("payment_fiat") + .withAdditionalConfig( + StrimziClientUtils.getApicurioAdditionalProperties(AvroKafkaSerializer.class.getName(), + "http://apicurio-registry-service." + namespace + ".svc:8080/apis/registry/v2") + "\n" + + "sasl.mechanism=SCRAM-SHA-512\n" + + "security.protocol=SASL_PLAINTEXT\n" + + "sasl.jaas.config=" + saslJaasConfigDecrypted + ) + .build(); + + KubeResourceManager.getInstance() + .createResourceWithWait( + kafkaProducerClient.producerStrimzi() + ); + + String registryUrl = "http://apicurio-registry-service." + namespace + ".svc:8080/apis/ccompat/v6"; + + // Add Minio + SetupMinio.deployMinio(namespace); + SetupMinio.createBucket(namespace, bucketName); + + // Deploy flink with test filter sql statement which filter to specific topic only payment type paypal + // Modify flink default deployment with state backend and pvc configuration + HashMap flinkConfig = new HashMap(); + flinkConfig.put("execution.checkpointing.interval", "60000"); + flinkConfig.put("execution.checkpointing.snapshot-compression", "true"); + flinkConfig.put("kubernetes.operator.job.restart.failed", "true"); + flinkConfig.put("state.backend.rocksdb.compression.per.level_FLINK_JIRA", "SNAPPY_COMPRESSION"); + // rocksdb can be used as a state backend but the location is referenced in s3 instead on local pvc + flinkConfig.put("state.backend.type", "rocksdb"); + flinkConfig.put("state.checkpoints.dir", "s3://" + bucketName + "/" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT); + flinkConfig.put("state.savepoints.dir", "s3://" + bucketName + "/" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT); + // Currently Minio is deployed only in HTTP mode so we need to specify http in the url + flinkConfig.put("s3.endpoint", "http://" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT); + flinkConfig.put("s3.path.style.access", "true"); + flinkConfig.put("s3.access-key", SetupMinio.ADMIN_CREDS); + flinkConfig.put("s3.secret-key", SetupMinio.ADMIN_CREDS); + + FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, + flinkDeploymentName, List.of(TestStatements.getTestFlinkFilter( + bootstrapServerAuth, registryUrl, kafkaUser, namespace))) + .editSpec() + .addToFlinkConfiguration( + flinkConfig + ) + .editPodTemplate() + .editOrNewSpec() + .editFirstContainer() + .endFlinkdeploymentspecContainer() + .endFlinkdeploymentspecSpec() + .endPodTemplate() + .endSpec() + .build(); + KubeResourceManager.getInstance() + .createOrUpdateResourceWithWait(flink); JobUtils.waitForJobSuccess(namespace, kafkaProducerClient.getProducerName(), TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); @@ -509,9 +883,11 @@ void testFRocksDbStateBackend() { Wait.until("Task manager contains info about rocksdb", TestFrameConstants.GLOBAL_POLL_INTERVAL_LONG, TestFrameConstants.GLOBAL_TIMEOUT, () -> { List taskManagerPods = KubeResourceManager.getKubeClient() - .listPodsByPrefixInName(namespace, namespace + "-taskmanager"); + .listPodsByPrefixInName(namespace, flinkDeploymentName + "-taskmanager"); for (Pod p : taskManagerPods) { - return KubeResourceManager.getKubeClient().getLogsFromPod(namespace, p.getMetadata().getName()) + return KubeResourceManager.getKubeClient() + .getLogsFromPod(namespace, p.getMetadata() + .getName()) .contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend"); } return false; @@ -530,18 +906,26 @@ void testFRocksDbStateBackend() { "security.protocol=SASL_PLAINTEXT\n" + "sasl.jaas.config=" + saslJaasConfigDecrypted ) - .withConsumerGroup("flink-filter-test-group").build(); + .withConsumerGroup("flink-filter-test-group") + .build(); - KubeResourceManager.getInstance().createResourceWithWait( - kafkaConsumerClient.consumerStrimzi() - ); + KubeResourceManager.getInstance() + .createResourceWithWait( + kafkaConsumerClient.consumerStrimzi() + ); JobUtils.waitForJobSuccess(namespace, kafkaConsumerClient.getConsumerName(), TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); - String consumerPodName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, consumerName) - .get(0).getMetadata().getName(); - String log = KubeResourceManager.getKubeClient().getLogsFromPod(namespace, consumerPodName); + String consumerPodName = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, consumerName) + .get(0) + .getMetadata() + .getName(); + String log = KubeResourceManager.getKubeClient() + .getLogsFromPod(namespace, consumerPodName); assertTrue(log.contains("\"type\":\"paypal\"")); assertFalse(log.contains("\"type\":\"creditCard\"")); + + MinioUtils.waitForDataInMinio(namespace, bucketName); } }