Skip to content

Commit

Permalink
KAFKA-16331: Remove EOSv1 from Kafka Streams system tests (#17108)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Sep 11, 2024
1 parent 0af75c0 commit 6fd973b
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,9 +742,9 @@ boolean runLoop() {
errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {

log.error("Shutting down because the Kafka cluster seems to be on a too old version. " +
"Setting {}=\"{}\"/\"{}\" requires broker version 2.5 or higher.",
"Setting {}=\"{}\" requires broker version 2.5 or higher.",
StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.EXACTLY_ONCE_BETA);
StreamsConfig.EXACTLY_ONCE_V2);
}
failedStreamThreadSensor.record();
this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public static void main(final String[] args) throws IOException {

streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");
streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
Expand Down Expand Up @@ -103,7 +103,7 @@ public static void main(final String[] args) throws IOException {
System.out.println("start Kafka Streams");
streams.start();

final boolean eosEnabled = processingMode.startsWith("exactly_once");
final boolean eosEnabled = processingMode.equals("exactly_once_v2");

System.out.println("send data");
final Properties producerProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class StreamsEosTest {
* args ::= kafka propFileName command
* command := "run" | "process" | "verify"
*/
@SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
Expand All @@ -49,12 +48,9 @@ public static void main(final String[] args) throws IOException {
}

if ("process".equals(command) || "process-complex".equals(command)) {
if (!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {

System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE_BETA + " or " + StreamsConfig.EXACTLY_ONCE_V2);
System.err.println("processingGuarantee must be " + StreamsConfig.EXACTLY_ONCE_V2);
Exit.exit(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class StreamsSmokeTest {
*
* @param args
*/
@SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
Expand All @@ -60,14 +59,10 @@ public static void main(final String[] args) throws IOException {

if ("process".equals(command)) {
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {

System.err.println("processingGuarantee must be either " +
StreamsConfig.AT_LEAST_ONCE + ", " +
StreamsConfig.EXACTLY_ONCE + ", or " +
StreamsConfig.EXACTLY_ONCE_BETA + ", or " +
StreamsConfig.EXACTLY_ONCE_V2);

Exit.exit(1);
Expand Down
19 changes: 9 additions & 10 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,17 +384,16 @@ class StreamsEosTestBaseService(StreamsTestBaseService):

clean_node_enabled = True

def __init__(self, test_context, kafka, processing_guarantee, command):
def __init__(self, test_context, kafka, command):
super(StreamsEosTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsEosTest",
command)
self.PROCESSING_GUARANTEE = processing_guarantee

def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
streams_property.PROCESSING_GUARANTEE: "exactly_once_v2",
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
Expand Down Expand Up @@ -440,24 +439,24 @@ def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3, r

class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "not-required", "run")
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")

class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka, processing_guarantee):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process")
def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")

class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka, processing_guarantee):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process-complex")
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")

class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify")
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")


class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify-complex")
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")


class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
Expand Down
27 changes: 2 additions & 25 deletions tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,6 @@ def test_compatible_brokers_eos_disabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_0_11_0),str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),
str(LATEST_2_1),str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),
str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
str(LATEST_3_8)])
def test_compatible_brokers_eos_alpha_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once")
processor.start()

self.consumer.start()

processor.wait()

wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")

self.consumer.stop()
self.kafka.stop()

@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
Expand Down Expand Up @@ -167,9 +144,9 @@ def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, broker_versio
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
with processor.node.account.monitor_log(processor.LOG_FILE) as log:
processor.start()
log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2"/"exactly_once_beta" requires broker version 2\.5 or higher\.',
log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2" requires broker version 2\.5 or higher\.',
timeout_sec=60,
err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"/\"exactly_once_beta\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account))
err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account))
monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException',
timeout_sec=60,
err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account))
Expand Down
44 changes: 20 additions & 24 deletions tests/kafkatest/tests/streams/streams_eos_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,19 @@ def __init__(self, test_context):
self.test_context = test_context

@cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
metadata_quorum=[quorum.isolated_kraft])
def test_rebalance_simple(self, processing_guarantee, metadata_quorum):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@matrix(metadata_quorum=[quorum.isolated_kraft])
def test_rebalance_simple(self, metadata_quorum):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))

@cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
metadata_quorum=[quorum.isolated_kraft])
def test_rebalance_complex(self, processing_guarantee, metadata_quorum):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@matrix(metadata_quorum=[quorum.isolated_kraft])
def test_rebalance_complex(self, metadata_quorum):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))

def run_rebalance(self, processor1, processor2, processor3, verifier):
Expand Down Expand Up @@ -83,21 +81,19 @@ def run_rebalance(self, processor1, processor2, processor3, verifier):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)

@cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
metadata_quorum=[quorum.isolated_kraft])
def test_failure_and_recovery(self, processing_guarantee, metadata_quorum):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@matrix(metadata_quorum=[quorum.isolated_kraft])
def test_failure_and_recovery(self, metadata_quorum):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))

@cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"],
metadata_quorum=[quorum.isolated_kraft])
def test_failure_and_recovery_complex(self, processing_guarantee, metadata_quorum):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@matrix(metadata_quorum=[quorum.isolated_kraft])
def test_failure_and_recovery_complex(self, metadata_quorum):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))

def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/streams/streams_smoke_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, test_context):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)

@cluster(num_nodes=8)
@matrix(processing_guarantee=['exactly_once', 'exactly_once_v2', 'at_least_once'],
@matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'],
crash=[True, False],
metadata_quorum=quorum.all_non_upgrade)
def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):
Expand Down

0 comments on commit 6fd973b

Please sign in to comment.