Skip to content

Commit

Permalink
Use earliest-offset for Flink-kafka client
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Stejskal <[email protected]>
  • Loading branch information
Frawless committed Nov 4, 2024
1 parent 7389659 commit 5f6e527
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
| 7. | Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter payment of type paypal and send data to flink.payment.paypal topic, for authentication is used secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is configured to use S3 as a state backend | FlinkDeployment is up and tasks are deployed and it sends filtered data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses S3 |
| 8. | Deploy strimzi-kafka-clients consumer as job and consume messages fromkafka topic flink.payment.paypal | Consumer is deployed and it consumes messages |
| 9. | Verify that messages are present | Messages are present |
| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as EmbeddedRocksDBStateBackend' | Log message is present |
| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as HashMapStateBackend' | Log message is present |
| 11. | Verify that Minio contains some data from Flink | Flink bucket is not empty |

**Labels:**
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/streams/sql/TestStatements.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public static String getTestFlinkFilter(String bootstrap, String registryUrl, St
additionalProperties.put("properties.group.id", "flink-filter-group");
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", registryUrl);
additionalProperties.put("scan.startup.mode", "latest-offset");
// Startup mode for Kafka consumer, we set earliest to catch even previously sent messages
additionalProperties.put("scan.startup.mode", "earliest-offset");
additionalProperties.put("properties.security.protocol", "SASL_PLAINTEXT");
additionalProperties.put("properties.sasl.mechanism", "SCRAM-SHA-512");
additionalProperties.put("properties.sasl.jaas.config",
Expand Down Expand Up @@ -239,7 +240,8 @@ public static String getWrongConnectionSql() {
additionalProperties.put("properties.group.id", "flink-filter-group");
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", "not-exists-sr.cluster.local:5001");
additionalProperties.put("scan.startup.mode", "latest-offset");
// Startup mode for Kafka consumer, we set earliest to catch even previously sent messages
additionalProperties.put("scan.startup.mode", "earliest-offset");

SqlWith sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
Expand Down
43 changes: 40 additions & 3 deletions src/main/java/io/streams/utils/MinioUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public static String getBucketSizeInfo(String namespace, String bucketName) {
"stat",
"local/" + bucketName)
.out();

}

/**
Expand All @@ -69,14 +68,31 @@ private static Map<String, Object> parseTotalSize(String bucketInfo) {
}
}

/**
* Parse out total size of bucket from the information about usage.
*
* @param bucketInfo String containing all stat info about bucket
* @return Object counts in the bucket
*/
private static int parseObjectCount(String bucketInfo) {
Pattern pattern = Pattern.compile("Objects count:\\s*(?<count>[\\d.]+)");
Matcher matcher = pattern.matcher(bucketInfo);

if (matcher.find()) {
return Integer.parseInt(matcher.group("count"));
} else {
throw new IllegalArgumentException("Objects count not found in the provided string");
}
}

/**
* Wait until size of the bucket is not 0 B.
*
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForDataInMinio(String namespace, String bucketName) {
Wait.until("data sync to Minio",
Wait.until("data sync from Kafka to Minio",
TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM,
TestFrameConstants.GLOBAL_TIMEOUT,
() -> {
Expand All @@ -91,7 +107,28 @@ public static void waitForDataInMinio(String namespace, String bucketName) {
}

/**
* Wait until size of the bucket is 0 B.
* Wait until size of the bucket is not 0 B.
*
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForObjectsInMinio(String namespace, String bucketName) {
Wait.until("data sync from Kafka to Minio",
TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM,
TestFrameConstants.GLOBAL_TIMEOUT,
() -> {
String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName);
int objectCount = parseObjectCount(bucketSizeInfo);
LOGGER.info("Collected object count: {}", objectCount);
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return objectCount > 0;
});
}


/**
* Wait until bucket is empty.
*
* @param namespace Minio location
* @param bucketName bucket name
Expand Down
41 changes: 18 additions & 23 deletions src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,8 @@ void testS3StateBackend() {
flinkConfig.put("execution.checkpointing.interval", "10000");
flinkConfig.put("execution.checkpointing.snapshot-compression", "true");
flinkConfig.put("kubernetes.operator.job.restart.failed", "true");
// flinkConfig.put("state.backend.rocksdb.compression.per.level_FLINK_JIRA", "SNAPPY_COMPRESSION");
// rocksdb can be used as a state backend but the location is referenced in s3 instead on local pvc
// flinkConfig.put("state.backend.type", "rocksdb");
flinkConfig.put("state.backend", "filesystem");
flinkConfig.put("state.backend", "rocksdb");
flinkConfig.put("state.checkpoints.dir", "s3://" + bucketName + "/" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT);
flinkConfig.put("state.savepoints.dir", "s3://" + bucketName + "/" + SetupMinio.MINIO + ":" + SetupMinio.MINIO_PORT);
// Currently Minio is deployed only in HTTP mode so we need to specify http in the url
Expand Down Expand Up @@ -881,7 +879,7 @@ void testS3StateBackend() {
JobUtils.waitForJobSuccess(namespace, kafkaProducerClient.getProducerName(),
TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM);

//Check task manager log for presence rocksbd configuration
//Check task manager log for presence checkpoint configuration
Wait.until("Task manager contains info about state.backend", TestFrameConstants.GLOBAL_POLL_INTERVAL_LONG,
TestFrameConstants.GLOBAL_TIMEOUT, () -> {
List<Pod> taskManagerPods = KubeResourceManager.getKubeClient()
Expand All @@ -890,28 +888,11 @@ void testS3StateBackend() {
return KubeResourceManager.getKubeClient()
.getLogsFromPod(namespace, p.getMetadata()
.getName())
// .contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend");
.contains("State backend loader loads the state backend as HashMapStateBackend");
.contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend");
}
return false;
});

// TODO remove
String consumerName1 = "pepa-consumer";
StrimziKafkaClients kafkaConsumerClient1 = new StrimziKafkaClientsBuilder()
.withConsumerName(consumerName1)
.withNamespaceName(namespace)
.withTopicName("flink.payment.data")
.withBootstrapAddress(bootstrapServerAuth)
.withMessageCount(100)
.withAdditionalConfig(
"sasl.mechanism=SCRAM-SHA-512\n" +
"security.protocol=SASL_PLAINTEXT\n" +
"sasl.jaas.config=" + saslJaasConfigDecrypted
)
.withConsumerGroup("flink-filter-test-group")
.build();

// Run consumer and check if data are filtered
String consumerName = "kafka-consumer";
StrimziKafkaClients kafkaConsumerClient = new StrimziKafkaClientsBuilder()
Expand Down Expand Up @@ -945,6 +926,20 @@ void testS3StateBackend() {
assertTrue(log.contains("\"type\":\"paypal\""));
assertFalse(log.contains("\"type\":\"creditCard\""));

MinioUtils.waitForDataInMinio(namespace, bucketName);
MinioUtils.waitForObjectsInMinio(namespace, bucketName);
String flinkDeploymentPodName = KubeResourceManager.getKubeClient()
.listPodsByPrefixInName(namespace, flinkDeploymentName)
.stream()
.filter(pod -> !pod.getMetadata()
.getName()
.contains("taskmanager"))
.toList()
.get(0)
.getMetadata()
.getName();

log = KubeResourceManager.getKubeClient().getLogsFromPod(namespace, flinkDeploymentPodName);
assertTrue(log.contains("Committing minio:9000"));
assertTrue(log.contains("Marking checkpoint 1 as completed for source Source: payment_fiat"));
}
}

0 comments on commit 5f6e527

Please sign in to comment.