From 5f6e527f424a1b9711c0a7c33ae16a2c621d3658 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Mon, 4 Nov 2024 15:00:29 +0100 Subject: [PATCH] Use earliest-offset for Flink-kafka client Signed-off-by: Jakub Stejskal --- .../streams/e2e/flink/sql/SqlJobRunnerST.md | 2 +- .../java/io/streams/sql/TestStatements.java | 6 ++- .../java/io/streams/utils/MinioUtils.java | 43 +++++++++++++++++-- .../streams/e2e/flink/sql/SqlJobRunnerST.java | 41 ++++++++---------- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md b/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md index cee5f94..e01f790 100644 --- a/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md +++ b/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md @@ -79,7 +79,7 @@ | 7. | Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter payment of type paypal and send data to flink.payment.paypal topic, for authentication is used secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is configured to use S3 as a state backend | FlinkDeployment is up and tasks are deployed and it sends filtered data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses S3 | | 8. | Deploy strimzi-kafka-clients consumer as job and consume messages fromkafka topic flink.payment.paypal | Consumer is deployed and it consumes messages | | 9. | Verify that messages are present | Messages are present | -| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as EmbeddedRocksDBStateBackend' | Log message is present | +| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as HashMapStateBackend' | Log message is present | | 11. | Verify that Minio contains some data from Flink | Flink bucket is not empty | **Labels:** diff --git a/src/main/java/io/streams/sql/TestStatements.java b/src/main/java/io/streams/sql/TestStatements.java index 72fb0df..d983097 100644 --- a/src/main/java/io/streams/sql/TestStatements.java +++ b/src/main/java/io/streams/sql/TestStatements.java @@ -177,7 +177,8 @@ public static String getTestFlinkFilter(String bootstrap, String registryUrl, St additionalProperties.put("properties.group.id", "flink-filter-group"); additionalProperties.put("value.format", "avro-confluent"); additionalProperties.put("value.avro-confluent.url", registryUrl); - additionalProperties.put("scan.startup.mode", "latest-offset"); + // Startup mode for Kafka consumer, we set earliest to catch even previously sent messages + additionalProperties.put("scan.startup.mode", "earliest-offset"); additionalProperties.put("properties.security.protocol", "SASL_PLAINTEXT"); additionalProperties.put("properties.sasl.mechanism", "SCRAM-SHA-512"); additionalProperties.put("properties.sasl.jaas.config", @@ -239,7 +240,8 @@ public static String getWrongConnectionSql() { additionalProperties.put("properties.group.id", "flink-filter-group"); additionalProperties.put("value.format", "avro-confluent"); additionalProperties.put("value.avro-confluent.url", "not-exists-sr.cluster.local:5001"); - additionalProperties.put("scan.startup.mode", "latest-offset"); + // Startup mode for Kafka consumer, we set earliest to catch even previously sent messages + additionalProperties.put("scan.startup.mode", "earliest-offset"); SqlWith sqlWith = new SqlWithBuilder() .withSqlStatement(builder.toString()) diff --git a/src/main/java/io/streams/utils/MinioUtils.java b/src/main/java/io/streams/utils/MinioUtils.java index de807dc..00b7b5b 100644 --- a/src/main/java/io/streams/utils/MinioUtils.java +++ b/src/main/java/io/streams/utils/MinioUtils.java @@ -49,7 +49,6 @@ public static String getBucketSizeInfo(String namespace, String bucketName) { "stat", "local/" + bucketName) .out(); - } /** @@ -69,6 +68,23 @@ private static Map parseTotalSize(String bucketInfo) { } } + /** + * Parse out total size of bucket from the information about usage. + * + * @param bucketInfo String containing all stat info about bucket + * @return Object counts in the bucket + */ + private static int parseObjectCount(String bucketInfo) { + Pattern pattern = Pattern.compile("Objects count:\\s*(?[\\d.]+)"); + Matcher matcher = pattern.matcher(bucketInfo); + + if (matcher.find()) { + return Integer.parseInt(matcher.group("count")); + } else { + throw new IllegalArgumentException("Objects count not found in the provided string"); + } + } + /** * Wait until size of the bucket is not 0 B. * @@ -76,7 +92,7 @@ private static Map parseTotalSize(String bucketInfo) { * @param bucketName bucket name */ public static void waitForDataInMinio(String namespace, String bucketName) { - Wait.until("data sync to Minio", + Wait.until("data sync from Kafka to Minio", TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestFrameConstants.GLOBAL_TIMEOUT, () -> { @@ -91,7 +107,28 @@ public static void waitForDataInMinio(String namespace, String bucketName) { } /** - * Wait until size of the bucket is 0 B. + * Wait until size of the bucket is not 0 B. + * + * @param namespace Minio location + * @param bucketName bucket name + */ + public static void waitForObjectsInMinio(String namespace, String bucketName) { + Wait.until("data sync from Kafka to Minio", + TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM, + TestFrameConstants.GLOBAL_TIMEOUT, + () -> { + String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName); + int objectCount = parseObjectCount(bucketSizeInfo); + LOGGER.info("Collected object count: {}", objectCount); + LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo); + + return objectCount > 0; + }); + } + + + /** + * Wait until bucket is empty. * * @param namespace Minio location * @param bucketName bucket name diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java index c711064..4002745 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java @@ -847,10 +847,8 @@ void testS3StateBackend() { flinkConfig.put("execution.checkpointing.interval", "10000"); flinkConfig.put("execution.checkpointing.snapshot-compression", "true"); flinkConfig.put("kubernetes.operator.job.restart.failed", "true"); -// flinkConfig.put("state.backend.rocksdb.compression.per.level_FLINK_JIRA", "SNAPPY_COMPRESSION"); // rocksdb can be used as a state backend but the location is referenced in s3 instead on local pvc -// flinkConfig.put("state.backend.type", "rocksdb"); - flinkConfig.put("state.backend", "filesystem"); + flinkConfig.put("state.backend", "rocksdb"); flinkConfig.put("state.checkpoints.dir", "s3://" + bucketName + "/" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT); flinkConfig.put("state.savepoints.dir", "s3://" + bucketName + "/" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT); // Currently Minio is deployed only in HTTP mode so we need to specify http in the url @@ -881,7 +879,7 @@ void testS3StateBackend() { JobUtils.waitForJobSuccess(namespace, kafkaProducerClient.getProducerName(), TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); - //Check task manager log for presence rocksbd configuration + //Check task manager log for presence checkpoint configuration Wait.until("Task manager contains info about state.backend", TestFrameConstants.GLOBAL_POLL_INTERVAL_LONG, TestFrameConstants.GLOBAL_TIMEOUT, () -> { List taskManagerPods = KubeResourceManager.getKubeClient() @@ -890,28 +888,11 @@ void testS3StateBackend() { return KubeResourceManager.getKubeClient() .getLogsFromPod(namespace, p.getMetadata() .getName()) -// .contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend"); - .contains("State backend loader loads the state backend as HashMapStateBackend"); + .contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend"); } return false; }); - // TODO remove - String consumerName1 = "pepa-consumer"; - StrimziKafkaClients kafkaConsumerClient1 = new StrimziKafkaClientsBuilder() - .withConsumerName(consumerName1) - .withNamespaceName(namespace) - .withTopicName("flink.payment.data") - .withBootstrapAddress(bootstrapServerAuth) - .withMessageCount(100) - .withAdditionalConfig( - "sasl.mechanism=SCRAM-SHA-512\n" + - "security.protocol=SASL_PLAINTEXT\n" + - "sasl.jaas.config=" + saslJaasConfigDecrypted - ) - .withConsumerGroup("flink-filter-test-group") - .build(); - // Run consumer and check if data are filtered String consumerName = "kafka-consumer"; StrimziKafkaClients kafkaConsumerClient = new StrimziKafkaClientsBuilder() @@ -945,6 +926,20 @@ void testS3StateBackend() { assertTrue(log.contains("\"type\":\"paypal\"")); assertFalse(log.contains("\"type\":\"creditCard\"")); - MinioUtils.waitForDataInMinio(namespace, bucketName); + MinioUtils.waitForObjectsInMinio(namespace, bucketName); + String flinkDeploymentPodName = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, flinkDeploymentName) + .stream() + .filter(pod -> !pod.getMetadata() + .getName() + .contains("taskmanager")) + .toList() + .get(0) + .getMetadata() + .getName(); + + log = KubeResourceManager.getKubeClient().getLogsFromPod(namespace, flinkDeploymentPodName); + assertTrue(log.contains("Committing minio:9000")); + assertTrue(log.contains("Marking checkpoint 1 as completed for source Source: payment_fiat")); } }