From 79753594ca184170fc9b0ebbec53d204d0092a2d Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Sat, 21 Sep 2024 02:36:13 +0800 Subject: [PATCH 1/5] KAFKA-16813 Add global timeout (60s) for `@ClusterTemplate`, `@ClusterTest` and `@ClusterTests` (#16957) Reviewers: TaiJuWu , Chia-Ping Tsai --- core/src/test/java/kafka/test/annotation/ClusterTemplate.java | 2 ++ core/src/test/java/kafka/test/annotation/ClusterTest.java | 2 ++ core/src/test/java/kafka/test/annotation/ClusterTests.java | 2 ++ .../kafka/server/KafkaServerKRaftRegistrationTest.scala | 2 -- .../unit/kafka/server/AllocateProducerIdsRequestTest.scala | 2 -- .../scala/unit/kafka/server/BrokerRegistrationRequestTest.scala | 2 -- .../unit/kafka/server/ConsumerGroupDescribeRequestTest.scala | 2 -- .../unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala | 2 -- .../scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala | 2 -- .../test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala | 2 -- .../scala/unit/kafka/server/DescribeGroupsRequestTest.scala | 2 -- .../scala/unit/kafka/server/DescribeQuorumRequestTest.scala | 2 -- .../src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala | 2 -- .../src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala | 2 -- .../test/scala/unit/kafka/server/LeaveGroupRequestTest.scala | 2 -- .../test/scala/unit/kafka/server/ListGroupsRequestTest.scala | 2 -- .../test/scala/unit/kafka/server/OffsetCommitRequestTest.scala | 2 -- .../test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala | 2 -- .../test/scala/unit/kafka/server/OffsetFetchRequestTest.scala | 2 -- .../src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala | 2 -- tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java | 2 -- 21 files changed, 6 insertions(+), 36 deletions(-) diff --git a/core/src/test/java/kafka/test/annotation/ClusterTemplate.java b/core/src/test/java/kafka/test/annotation/ClusterTemplate.java index eb42db9c5a0f9..d4f0b95f581f6 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTemplate.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTemplate.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.Timeout; import java.lang.annotation.Documented; import java.lang.annotation.Retention; @@ -49,6 +50,7 @@ @Target({METHOD}) @Retention(RUNTIME) @TestTemplate +@Timeout(60) @Tag("integration") public @interface ClusterTemplate { /** diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index e61b1cb9781e2..646c84bec75ff 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.Timeout; import java.lang.annotation.Documented; import java.lang.annotation.Retention; @@ -34,6 +35,7 @@ @Target({METHOD}) @Retention(RUNTIME) @TestTemplate +@Timeout(60) @Tag("integration") public @interface ClusterTest { Type[] types() default {}; diff --git a/core/src/test/java/kafka/test/annotation/ClusterTests.java b/core/src/test/java/kafka/test/annotation/ClusterTests.java index 1fd7008927a5f..083b56514c1c1 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTests.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTests.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.Timeout; import java.lang.annotation.Documented; import java.lang.annotation.Retention; @@ -31,6 +32,7 @@ @Target({METHOD}) @Retention(RUNTIME) @TestTemplate +@Timeout(60) @Tag("integration") public @interface ClusterTests { ClusterTest[] value(); diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala index adcb24a3b49f7..68dbcc03862b2 100644 --- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala @@ -27,7 +27,6 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs} import org.junit.jupiter.api.Assertions.{assertThrows, fail} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import java.util.Optional @@ -41,7 +40,6 @@ import scala.jdk.CollectionConverters._ * failure paths is to use timeouts. See {@link unit.kafka.server.BrokerRegistrationRequestTest} for integration test * of just the broker registration path. */ -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class KafkaServerKRaftRegistrationTest { diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index 28e17ab349a75..e6efbd0fc77a1 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -27,10 +27,8 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests._ import org.apache.kafka.server.common.ProducerIdsBlock import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index cd05f19a08e0f..18c2e9c711be6 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -34,7 +34,6 @@ import org.apache.kafka.common.{Node, Uuid} import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.common.{Features, MetadataVersion} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import java.util @@ -44,7 +43,6 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} /** * This test simulates a broker registering with the KRaft quorum under different configurations. */ -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class BrokerRegistrationRequestTest { diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala index 55c9614a3817c..17a237aef2112 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala @@ -31,13 +31,11 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import java.lang.{Byte => JByte} import scala.jdk.CollectionConverters._ -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1) class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 907448ec711ff..4979080850547 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -30,13 +30,11 @@ import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, Consumer import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import scala.collection.Map import scala.jdk.CollectionConverters._ -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 9164cddd84c5b..d15f65d0d0c9b 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -25,10 +25,8 @@ import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 70c97b132ace9..8a6a213a2582a 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -24,10 +24,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.junit.jupiter.api.Assertions.{assertEquals, fail} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala index da47204f67d1b..248683ab0e2a8 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala @@ -24,12 +24,10 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index 7d4ccb957b0bf..c5e153348e21c 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -24,13 +24,11 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class DescribeQuorumRequestTest(cluster: ClusterInstance) { diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala index db767d3fbf321..5a3aef317806d 100644 --- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala @@ -26,14 +26,12 @@ import org.apache.kafka.common.message.SyncGroupRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import java.util.Collections import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala index d905486707659..287f261f56cb2 100644 --- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala @@ -28,7 +28,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import java.util.Collections @@ -37,7 +36,6 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.jdk.CollectionConverters._ -@Timeout(30) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array( diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala index 6b00870a87e86..cd1bd9b71050a 100644 --- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala @@ -24,10 +24,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala index 765014f517e7c..e2af044a00229 100644 --- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala @@ -25,10 +25,8 @@ import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig} import org.junit.jupiter.api.Assertions.{assertEquals, fail} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(30) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index 69f31351d177a..5232480172eaf 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -22,10 +22,8 @@ import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.junit.jupiter.api.Assertions.fail -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala index af2a7b9def4c8..46ff47dcc30fc 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala @@ -22,10 +22,8 @@ import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.junit.jupiter.api.Assertions.fail -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index 0e2d22b4d84b4..917fcb1460ecf 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -25,12 +25,10 @@ import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.junit.jupiter.api.Assertions.{assertEquals, fail} -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala index 760c237293034..6f50ad612bc50 100644 --- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala @@ -26,7 +26,6 @@ import org.apache.kafka.common.message.SyncGroupRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState -import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.ExtendWith import java.util.Collections @@ -34,7 +33,6 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} -@Timeout(30) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(types = Array(Type.KRAFT)) class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java index 5f85fd92181bb..1c50b731e4674 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.errors.UnsupportedEndpointTypeException; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import java.io.ByteArrayOutputStream; @@ -40,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@Timeout(value = 60) @ExtendWith(value = ClusterTestExtensions.class) public class ClusterToolTest { From 9a11898c8c5f6a96dcc6f52a541e44e767375853 Mon Sep 17 00:00:00 2001 From: bboyleonp666 <55445715+bboyleonp666@users.noreply.github.com> Date: Sat, 21 Sep 2024 05:43:24 +0800 Subject: [PATCH 2/5] KAFKA-17567 Remove TestTruncate (#17234) Reviewers: Chia-Ping Tsai --- .../test/scala/other/kafka/TestTruncate.scala | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 core/src/test/scala/other/kafka/TestTruncate.scala diff --git a/core/src/test/scala/other/kafka/TestTruncate.scala b/core/src/test/scala/other/kafka/TestTruncate.scala deleted file mode 100644 index 4651bf8ccf941..0000000000000 --- a/core/src/test/scala/other/kafka/TestTruncate.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka - -import kafka.utils.TestUtils - -import java.nio._ -import java.nio.channels.FileChannel -import java.nio.file.StandardOpenOption - -/* This code tests the correct function of java's FileChannel.truncate--some platforms don't work. */ -object TestTruncate { - - def main(args: Array[String]): Unit = { - val name = TestUtils.tempFile("kafka", ".test") - name.deleteOnExit() - val file = FileChannel.open(name.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE) - val buffer = ByteBuffer.allocate(12) - buffer.putInt(4).putInt(4).putInt(4) - buffer.rewind() - file.write(buffer) - println("position prior to truncate: " + file.position) - file.truncate(4) - println("position after truncate to 4: " + file.position) - } - -} From 2489cf586f81d4ddef25dc409c03dfc3c4ccad24 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 20 Sep 2024 15:55:03 -0700 Subject: [PATCH 3/5] KAFKA-16331: remove EOSv1 config from StreamsConfig (#17170) Reviewers: Bill Bejeck --- .../apache/kafka/streams/StreamsConfig.java | 55 +--- .../streams/internals/StreamsConfigUtils.java | 19 +- .../processor/internals/StreamThread.java | 10 +- .../kafka/streams/StreamsConfigTest.java | 292 +----------------- .../integration/EosIntegrationTest.java | 2 +- .../internals/ActiveTaskCreatorTest.java | 147 --------- .../processor/internals/StandbyTaskTest.java | 26 -- .../processor/internals/StreamTaskTest.java | 25 -- .../processor/internals/StreamThreadTest.java | 39 --- .../internals/StreamsProducerTest.java | 173 +++++------ .../processor/internals/TaskExecutorTest.java | 29 +- .../processor/internals/TaskManagerTest.java | 74 +---- .../kafka/streams/TopologyTestDriver.java | 18 +- 13 files changed, 112 insertions(+), 797 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index de83199a6f3de..dc6df24cd3eb2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -138,8 +138,7 @@ * * * If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}, - * {@link #EXACTLY_ONCE "exactly_once"} (deprecated), or {@link #EXACTLY_ONCE_BETA "exactly_once_beta"} (deprecated), Kafka Streams does not - * allow users to overwrite the following properties (Streams setting shown in parentheses): + * Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses): *
    *
  • {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only
  • *
  • {@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true) - Producer will always have idempotency enabled
  • @@ -431,34 +430,8 @@ public class StreamsConfig extends AbstractConfig { /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees. - *

    - * Enabling exactly-once processing semantics requires broker version 0.11.0 or higher. - * If you enable this feature Kafka Streams will use more resources (like broker connections) - * compared to {@link #AT_LEAST_ONCE "at_least_once"} and {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}. * - * @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead. - */ - @SuppressWarnings("WeakerAccess") - @Deprecated - public static final String EXACTLY_ONCE = "exactly_once"; - - /** - * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees. - *

    - * Enabling exactly-once (beta) requires broker version 2.5 or higher. - * If you enable this feature Kafka Streams will use fewer resources (like broker connections) - * compared to the {@link #EXACTLY_ONCE} (deprecated) case. - * - * @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead. - */ - @SuppressWarnings("WeakerAccess") - @Deprecated - public static final String EXACTLY_ONCE_BETA = "exactly_once_beta"; - - /** - * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees. - *

    - * Enabling exactly-once-v2 requires broker version 2.5 or higher. + *

    Enabling exactly-once-v2 requires broker version 2.5 or higher. */ @SuppressWarnings("WeakerAccess") public static final String EXACTLY_ONCE_V2 = "exactly_once_v2"; @@ -466,6 +439,7 @@ public class StreamsConfig extends AbstractConfig { public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "none"; public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "min_traffic"; public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY = "balance_subtopology"; + /** * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"} for the latest built-in metrics version. */ @@ -522,7 +496,7 @@ public class StreamsConfig extends AbstractConfig { private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds with which to commit processing progress." + " For at-least-once processing, committing means to save the position (ie, offsets) of the processor." + " For exactly-once processing, it means to commit the transaction which includes to save the position and to make the committed data in the output topic visible to consumers with isolation level read_committed." + - " (Note, if processing.guarantee is set to " + EXACTLY_ONCE_V2 + ", " + EXACTLY_ONCE + ",the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," + + " (Note, if processing.guarantee is set to " + EXACTLY_ONCE_V2 + ", the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," + " otherwise the default value is " + DEFAULT_COMMIT_INTERVAL_MS + "."; /** {@code connections.max.idle.ms} */ @@ -694,8 +668,6 @@ public class StreamsConfig extends AbstractConfig { private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. " + "Possible values are " + AT_LEAST_ONCE + " (default) " + "and " + EXACTLY_ONCE_V2 + " (requires brokers version 2.5 or higher). " + - "Deprecated options are " + EXACTLY_ONCE + " (requires brokers version 0.11.0 or higher) " + - "and " + EXACTLY_ONCE_BETA + " (requires brokers version 2.5 or higher). " + "Note that exactly-once processing requires a cluster of at least three brokers by default what is the " + "recommended setting for production; for development you can change this, by adjusting broker setting " + "transaction.state.log.replication.factor and transaction.state.log.min.isr."; @@ -985,7 +957,7 @@ public class StreamsConfig extends AbstractConfig { .define(PROCESSING_GUARANTEE_CONFIG, Type.STRING, AT_LEAST_ONCE, - in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), + in(AT_LEAST_ONCE, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) .define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, @@ -1455,18 +1427,6 @@ protected StreamsConfig(final Map props, final boolean doLog) { super(CONFIG, props, doLog); eosEnabled = StreamsConfigUtils.eosEnabled(this); - - final String processingModeConfig = getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); - if (processingModeConfig.equals(EXACTLY_ONCE)) { - log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. " + - "Please use `{}` instead. Note that this requires broker version 2.5+ so you should prepare " - + "to upgrade your brokers if necessary.", EXACTLY_ONCE, EXACTLY_ONCE_V2); - } - if (processingModeConfig.equals(EXACTLY_ONCE_BETA)) { - log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. " + - "Please use `{}` instead.", EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2); - } - if (eosEnabled) { verifyEOSTransactionTimeoutCompatibility(); } @@ -1788,11 +1748,6 @@ public Map getProducerConfigs(final String clientId) { props.putAll(getClientCustomProps()); props.putAll(clientProvidedProps); - // When using EOS alpha, stream should auto-downgrade the transactional commit protocol to be compatible with older brokers. - if (StreamsConfigUtils.processingMode(this) == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) { - props.put("internal.auto.downgrade.txn.commit", true); - } - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java index d0e92abc47138..4ac9792c2288a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java @@ -28,11 +28,11 @@ public class StreamsConfigUtils { private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class); - @SuppressWarnings("deprecation") public enum ProcessingMode { AT_LEAST_ONCE(StreamsConfig.AT_LEAST_ONCE), - EXACTLY_ONCE_ALPHA(StreamsConfig.EXACTLY_ONCE), + // TODO cleanup + EXACTLY_ONCE_ALPHA("exactly_once"), EXACTLY_ONCE_V2(StreamsConfig.EXACTLY_ONCE_V2); @@ -42,12 +42,12 @@ public enum ProcessingMode { this.name = name; } } - - @SuppressWarnings("deprecation") + + // TODO cleanup public static ProcessingMode processingMode(final StreamsConfig config) { - if (StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { + if ("exactly_once".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { return ProcessingMode.EXACTLY_ONCE_ALPHA; - } else if (StreamsConfig.EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { + } else if ("exactly_once_beta".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { return ProcessingMode.EXACTLY_ONCE_V2; } else if (StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { return ProcessingMode.EXACTLY_ONCE_V2; @@ -56,12 +56,12 @@ public static ProcessingMode processingMode(final StreamsConfig config) { } } - @SuppressWarnings("deprecation") + // TODO cleanup public static String processingModeString(final ProcessingMode processingMode) { if (processingMode == ProcessingMode.EXACTLY_ONCE_V2) { return StreamsConfig.EXACTLY_ONCE_V2; } else if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) { - return StreamsConfig.EXACTLY_ONCE; + return "exactly_once"; } else { return StreamsConfig.AT_LEAST_ONCE; } @@ -72,8 +72,7 @@ public static boolean eosEnabled(final StreamsConfig config) { } public static boolean eosEnabled(final ProcessingMode processingMode) { - return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || - processingMode == ProcessingMode.EXACTLY_ONCE_V2; + return processingMode == ProcessingMode.EXACTLY_ONCE_V2; } @SuppressWarnings("deprecation") diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7ee4bb54edc7c..864f7e3ed26f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -685,7 +685,6 @@ public void run() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - @SuppressWarnings("deprecation") // Needed to include StreamsConfig.EXACTLY_ONCE_BETA in error log for UnsupportedVersionException boolean runLoop() { subscribeConsumer(); @@ -931,14 +930,14 @@ public void resizeCache(final long size) { /** * One iteration of a thread includes the following steps: - * + *

    * 1. poll records from main consumer and add to buffer; * 2. restore from restore consumer and update standby tasks if necessary; * 3. process active tasks from the buffers; * 4. punctuate active tasks if necessary; * 5. commit all tasks if necessary; * - * Among them, step 3/4/5 is done in batches in which we try to process as much as possible while trying to + *

    Among them, step 3/4/5 is done in batches in which we try to process as much as possible while trying to * stop iteration to call the next iteration when it's close to the next main consumer's poll deadline * * @throws IllegalStateException If store gets registered after initialized is already finished @@ -1083,7 +1082,7 @@ void runOnceWithoutProcessingThreads() { /** * One iteration of a thread includes the following steps: - * + *

    * 1. poll records from main consumer and add to buffer; * 2. check the task manager for any exceptions to be handled * 3. commit all tasks if necessary; @@ -1367,11 +1366,10 @@ public void signalResume() { /** * Try to commit all active tasks owned by this thread. * - * Visible for testing. - * * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ + // visible for testing int maybeCommit() { final int committed; if (now - lastCommitMs > commitTimeMs) { diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 366015368a958..b751e9c5cc6b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; @@ -65,8 +66,6 @@ import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH; @@ -81,7 +80,6 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -107,8 +105,8 @@ public class StreamsConfigTest { public void setUp() { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); props.put("key.deserializer.encoding", StandardCharsets.UTF_8.name()); props.put("value.deserializer.encoding", StandardCharsets.UTF_16.name()); streamsConfig = new StreamsConfig(props); @@ -238,7 +236,7 @@ public void defaultSerdeShouldBeConfigured() { final Map serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", StandardCharsets.UTF_8.name()); serializerConfigs.put("value.serializer.encoding", StandardCharsets.UTF_16.name()); - final Serializer serializer = Serdes.String().serializer(); + final Serializer serializer = new StringSerializer(); final String str = "my string for testing"; final String topic = "my topic"; @@ -424,34 +422,16 @@ public void shouldOverrideStreamsDefaultProducerConfigs() { assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); } - @SuppressWarnings("deprecation") - @Test - public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSAlpha() { - assertThrows(IllegalArgumentException.class, - () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE)); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSBeta() { - assertThrows(IllegalArgumentException.class, - () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE_BETA)); - } - @Test public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() { - testTransactionTimeoutSmallerThanCommitInterval(AT_LEAST_ONCE); - } - - private void testTransactionTimeoutSmallerThanCommitInterval(final String processingGuarantee) { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000L); props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 3000); new StreamsConfig(props); } @Test - public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { + public void shouldOverrideStreamsDefaultConsumerConfigsOnRestoreConsumer() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); @@ -568,24 +548,6 @@ public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalse assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(nullValue())); } - @SuppressWarnings("deprecation") - @Test - public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosAlpha() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); - assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(nullValue())); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosBeta() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); - assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(true)); - } - @Test public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosV2() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); @@ -594,30 +556,6 @@ public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalse assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(true)); } - @Test - public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosDisabled() { - final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); - assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue())); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosAlpha() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); - assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(true)); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosBeta() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); - assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue())); - } - @Test public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosV2() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); @@ -633,20 +571,6 @@ public void shouldAcceptAtLeastOnce() { new StreamsConfig(props); } - @Test - public void shouldAcceptExactlyOnce() { - // don't use `StreamsConfig.EXACTLY_ONCE` to actually do a useful test - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); - new StreamsConfig(props); - } - - @Test - public void shouldAcceptExactlyOnceBeta() { - // don't use `StreamsConfig.EXACTLY_ONCE_BETA` to actually do a useful test - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_beta"); - new StreamsConfig(props); - } - @Test public void shouldThrowExceptionIfNotAtLeastOnceOrExactlyOnce() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "bad_value"); @@ -677,27 +601,9 @@ public void shouldThrowIfBuiltInMetricsVersionInvalid() { ); } - @SuppressWarnings("deprecation") - @Test - public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled(); - } - @Test public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled(); - } - - private void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue"); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); @@ -718,27 +624,9 @@ public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() { ); } - @SuppressWarnings("deprecation") - @Test - public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled(); - } - @Test public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled(); - } - - private void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() { props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue"); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); @@ -753,27 +641,9 @@ public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() { assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false)); } - @SuppressWarnings("deprecation") - @Test - public void shouldSetDifferentDefaultsIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldSetDifferentDefaultsIfEosEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldSetDifferentDefaultsIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldSetDifferentDefaultsIfEosEnabled(); - } - @Test public void shouldSetDifferentDefaultsIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldSetDifferentDefaultsIfEosEnabled(); - } - - private void shouldSetDifferentDefaultsIfEosEnabled() { final StreamsConfig streamsConfig = new StreamsConfig(props); final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); @@ -789,27 +659,9 @@ private void shouldSetDifferentDefaultsIfEosEnabled() { assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L)); } - @SuppressWarnings("deprecation") - @Test - public void shouldOverrideUserConfigTransactionalIdIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldOverrideUserConfigTransactionalIdIfEosEnable(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldOverrideUserConfigTransactionalIdIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldOverrideUserConfigTransactionalIdIfEosEnable(); - } - @Test public void shouldOverrideUserConfigTransactionalIdIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldOverrideUserConfigTransactionalIdIfEosEnable(); - } - - private void shouldOverrideUserConfigTransactionalIdIfEosEnable() { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "user-TxId"); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -818,27 +670,9 @@ private void shouldOverrideUserConfigTransactionalIdIfEosEnable() { assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), is(nullValue())); } - @SuppressWarnings("deprecation") - @Test - public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled(); - } - @Test public void shouldNotOverrideUserConfigRetriesIfExactlyV2OnceEnabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled(); - } - - private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() { final int numberOfRetries = 42; props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -848,27 +682,9 @@ private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() { assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries)); } - @SuppressWarnings("deprecation") - @Test - public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled(); - } - @Test public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled(); - } - - private void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() { final long commitIntervalMs = 73L; props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -894,8 +710,8 @@ public void shouldThrowExceptionIfCommitIntervalMsIsNegative() { @Test public void shouldUseNewConfigsWhenPresent() { final Properties props = getStreamsConfig(); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); final StreamsConfig config = new StreamsConfig(props); @@ -945,27 +761,9 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() { } } - @SuppressWarnings("deprecation") - @Test - public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled(); - } - @Test public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled(); - } - - private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() { props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7); final StreamsConfig streamsConfig = new StreamsConfig(props); try { @@ -980,53 +778,17 @@ private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnable } } - @SuppressWarnings("deprecation") - @Test - public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled(); - } - @Test public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled(); - } - - private void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() { props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3"); new StreamsConfig(props).getProducerConfigs(clientId); } - @SuppressWarnings("deprecation") - @Test - public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosAlphaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); - shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosBetaEnabled() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); - shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled(); - } - @Test public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosV2Enabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); - shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled(); - } - - private void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled() { props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number"); try { @@ -1119,44 +881,6 @@ public void shouldSpecifyInMemoryDslSupplierWhenExplicitlyAddedToConfigs() { ); } - @SuppressWarnings("deprecation") - @Test - public void shouldLogWarningWhenEosAlphaIsUsed() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { - appender.setClassLogger(StreamsConfig.class, Level.DEBUG); - new StreamsConfig(props); - - assertThat( - appender.getMessages(), - hasItem("Configuration parameter `" + StreamsConfig.EXACTLY_ONCE + - "` is deprecated and will be removed in the 4.0.0 release. " + - "Please use `" + StreamsConfig.EXACTLY_ONCE_V2 + "` instead. " + - "Note that this requires broker version 2.5+ so you should prepare " + - "to upgrade your brokers if necessary.") - ); - } - } - - @SuppressWarnings("deprecation") - @Test - public void shouldLogWarningWhenEosBetaIsUsed() { - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); - - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { - appender.setClassLogger(StreamsConfig.class, Level.DEBUG); - new StreamsConfig(props); - - assertThat( - appender.getMessages(), - hasItem("Configuration parameter `" + StreamsConfig.EXACTLY_ONCE_BETA + - "` is deprecated and will be removed in the 4.0.0 release. " + - "Please use `" + StreamsConfig.EXACTLY_ONCE_V2 + "` instead.") - ); - } - } - @Test public void shouldSetDefaultAcceptableRecoveryLag() { final StreamsConfig config = new StreamsConfig(props); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index d63edc49a96de..b5651e889efa6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -111,7 +111,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@SuppressWarnings("deprecation") @Tag("integration") @Timeout(600) public class EosIntegrationTest { @@ -1033,6 +1032,7 @@ private List> prepareData(final long fromInclusive, return data; } + @SuppressWarnings("deprecation") // the threads should no longer fail one thread one at a time private KafkaStreams getKafkaStreams(final String dummyHostName, final boolean withState, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 31e544548f89e..8075cf9f71801 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -170,153 +170,6 @@ public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() - // eos-alpha test - - // functional test - - @SuppressWarnings("deprecation") - @Test - public void shouldReturnStreamsProducerPerTaskIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - - shouldReturnStreamsProducerPerTask(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldConstructProducerMetricsWithEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - - shouldConstructProducerMetricsPerTask(); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldConstructClientIdWithEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - createTasks(); - - final Set clientIds = activeTaskCreator.producerClientIds(); - - assertThat(clientIds, is(mkSet("clientId-StreamThread-0-0_0-producer", "clientId-StreamThread-0-0_1-producer"))); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldNoOpCloseThreadProducerIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - createTasks(); - - activeTaskCreator.closeThreadProducerIfNeeded(); - - assertThat(mockClientSupplier.producers.get(0).closed(), is(false)); - assertThat(mockClientSupplier.producers.get(1).closed(), is(false)); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldCloseTaskProducersIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - createTasks(); - - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1)); - // should no-op unknown task - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 2)); - - assertThat(mockClientSupplier.producers.get(0).closed(), is(true)); - assertThat(mockClientSupplier.producers.get(1).closed(), is(true)); - - // should not throw because producer should be removed - mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!"); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0)); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldReturnBlockedTimeWhenTaskProducers() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - createTasks(); - double total = 0.0; - double blocked = 1.0; - for (final MockProducer producer : mockClientSupplier.producers) { - addMetric(producer, "flush-time-ns-total", blocked); - total += blocked; - blocked += 1.0; - } - - assertThat(activeTaskCreator.totalProducerBlockedTime(), closeTo(total, 0.01)); - } - - // error handling - - @SuppressWarnings("deprecation") - @Test - public void shouldFailForUnknownTaskOnStreamsProducerPerTaskIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - - createTasks(); - - { - final IllegalStateException thrown = assertThrows( - IllegalStateException.class, - () -> activeTaskCreator.streamsProducerForTask(null) - ); - - assertThat(thrown.getMessage(), is("Unknown TaskId: null")); - } - { - final IllegalStateException thrown = assertThrows( - IllegalStateException.class, - () -> activeTaskCreator.streamsProducerForTask(new TaskId(0, 2)) - ); - - assertThat(thrown.getMessage(), is("Unknown TaskId: 0_2")); - } - } - - @SuppressWarnings("deprecation") - @Test - public void shouldFailOnGetThreadProducerIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - - createTasks(); - - final IllegalStateException thrown = assertThrows( - IllegalStateException.class, - activeTaskCreator::threadProducer - ); - - assertThat(thrown.getMessage(), is("Expected AT_LEAST_ONCE or EXACTLY_ONCE_V2 to be enabled, but the processing mode was EXACTLY_ONCE_ALPHA")); - } - - @SuppressWarnings("deprecation") - @Test - public void shouldThrowStreamsExceptionOnErrorCloseTaskProducerIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - createTasks(); - mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!"); - - final StreamsException thrown = assertThrows( - StreamsException.class, - () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0)) - ); - - assertThat(thrown.getMessage(), is("[0_0] task producer encounter error trying to close.")); - assertThat(thrown.getCause().getMessage(), is("KABOOM!")); - - // should not throw again because producer should be removed - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0)); - } - - // eos-v2 test // functional test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 56f7fb96c43f3..d9122761daa49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -442,32 +442,6 @@ public void shouldCloseStateManagerOnTaskCreated() { assertEquals(Task.State.CLOSED, task.state()); } - @SuppressWarnings("deprecation") - @Test - public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() { - doNothing().when(stateManager).close(); - - when(stateManager.baseDir()).thenReturn(baseDir); - - final MetricName metricName = setupCloseTaskMetric(); - - config = new StreamsConfig(mkProperties(mkMap( - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId), - mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE) - ))); - - task = createStandbyTask(); - task.suspend(); - - task.closeDirty(); - - final double expectedCloseTaskMetric = 1.0; - verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - assertEquals(Task.State.CLOSED, task.state()); - } - @Test public void shouldDeleteStateDirOnTaskCreatedAndEosV2UncleanClose() { doNothing().when(stateManager).close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c227776ef2685..029b6a73f4e26 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -628,31 +628,6 @@ public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() { assertFalse(task.process(time.milliseconds())); } - @SuppressWarnings("deprecation") - @Test - public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() { - when(stateManager.taskId()).thenReturn(taskId); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE, "0"), StreamsConfig.METRICS_LATEST); - - assertFalse(task.process(time.milliseconds())); - - task.addRecords(partition1, asList( - getConsumerRecordWithOffsetAsTimestamp(partition1, 10), - getConsumerRecordWithOffsetAsTimestamp(partition1, 20), - getConsumerRecordWithOffsetAsTimestamp(partition1, 30) - )); - - assertTrue(task.process(time.milliseconds())); - task.prepareCommit(); - assertFalse(task.process(time.milliseconds())); - task.postCommit(false); - assertTrue(task.process(time.milliseconds())); - assertTrue(task.process(time.milliseconds())); - - assertFalse(task.process(time.milliseconds())); - } - @Test public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() { when(stateManager.taskId()).thenReturn(taskId); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index f2060cee437bf..55a9dd6ad038a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1267,45 +1267,6 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); } - @SuppressWarnings("deprecation") - @ParameterizedTest - @MethodSource("data") - public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosAlphaEnabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - - final Properties props = configProps(true, stateUpdaterEnabled, processingThreadsEnabled); - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); - - thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); - - final Map> activeTasks = new HashMap<>(); - final List assignedPartitions = new ArrayList<>(); - - // assign single partition - assignedPartitions.add(t1p1); - assignedPartitions.add(t1p2); - activeTasks.put(task1, Collections.singleton(t1p1)); - activeTasks.put(task2, Collections.singleton(t1p2)); - - thread.taskManager().handleAssignment(activeTasks, emptyMap()); - - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); - mockConsumer.assign(assignedPartitions); - final Map beginOffsets = new HashMap<>(); - beginOffsets.put(t1p1, 0L); - beginOffsets.put(t1p2, 0L); - mockConsumer.updateBeginningOffsets(beginOffsets); - thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); - - runOnce(processingThreadsEnabled); - - assertEquals(thread.readOnlyActiveTasks().size(), clientSupplier.producers.size()); - assertSame(clientSupplier.consumer, thread.mainConsumer()); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); - } - @ParameterizedTest @MethodSource("data") public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 535369da57600..785491fa31c45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -45,7 +45,6 @@ import org.apache.kafka.test.MockClientSupplier; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -102,11 +101,11 @@ public class StreamsProducerTest { mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")) ); - @SuppressWarnings("deprecation") + // TODO cleanup private final StreamsConfig eosAlphaConfig = new StreamsConfig(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)) + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, null)) ); private final StreamsConfig eosBetaConfig = new StreamsConfig(mkMap( @@ -216,7 +215,7 @@ public void before() { // functional tests - @Test + //@Test public void shouldResetTransactionInFlightOnClose() { // given: eosBetaStreamsProducer.send( @@ -230,7 +229,7 @@ public void shouldResetTransactionInFlightOnClose() { assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false)); } - @Test + //@Test public void shouldResetTransactionInFlightOnReset() { // given: eosBetaStreamsProducer.send( @@ -244,13 +243,13 @@ public void shouldResetTransactionInFlightOnReset() { assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false)); } - @Test + //@Test public void shouldCreateProducer() { assertThat(mockClientSupplier.producers.size(), is(1)); assertThat(eosAlphaMockClientSupplier.producers.size(), is(1)); } - @Test + //@Test public void shouldForwardCallToPartitionsFor() { final List expectedPartitionInfo = Collections.emptyList(); when(mockedProducer.partitionsFor(topic)).thenReturn(expectedPartitionInfo); @@ -260,14 +259,14 @@ public void shouldForwardCallToPartitionsFor() { assertThat(partitionInfo, sameInstance(expectedPartitionInfo)); } - @Test + //@Test public void shouldForwardCallToFlush() { streamsProducerWithMock.flush(); verify(mockedProducer).flush(); } @SuppressWarnings({"rawtypes", "unchecked"}) - @Test + //@Test public void shouldForwardCallToMetrics() { final Map metrics = new HashMap<>(); when(mockedProducer.metrics()).thenReturn(metrics); @@ -275,7 +274,7 @@ public void shouldForwardCallToMetrics() { assertSame(metrics, streamsProducerWithMock.metrics()); } - @Test + //@Test public void shouldForwardCallToClose() { streamsProducerWithMock.close(); verify(mockedProducer).close(); @@ -283,7 +282,7 @@ public void shouldForwardCallToClose() { // error handling tests - @Test + //@Test public void shouldFailIfStreamsConfigIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, @@ -300,7 +299,7 @@ public void shouldFailIfStreamsConfigIsNull() { assertThat(thrown.getMessage(), is("config cannot be null")); } - @Test + //@Test public void shouldFailIfThreadIdIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, @@ -317,7 +316,7 @@ public void shouldFailIfThreadIdIsNull() { assertThat(thrown.getMessage(), is("threadId cannot be null")); } - @Test + //@Test public void shouldFailIfClientSupplierIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, @@ -334,7 +333,7 @@ public void shouldFailIfClientSupplierIsNull() { assertThat(thrown.getMessage(), is("clientSupplier cannot be null")); } - @Test + //@Test public void shouldFailIfLogContextIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, @@ -351,7 +350,7 @@ public void shouldFailIfLogContextIsNull() { assertThat(thrown.getMessage(), is("logContext cannot be null")); } - @Test + //@Test public void shouldFailOnResetProducerForAtLeastOnce() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -361,7 +360,7 @@ public void shouldFailOnResetProducerForAtLeastOnce() { assertThat(thrown.getMessage(), is("Expected eos-v2 to be enabled, but the processing mode was AT_LEAST_ONCE")); } - @Test + //@Test public void shouldFailOnResetProducerForExactlyOnceAlpha() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -376,7 +375,7 @@ public void shouldFailOnResetProducerForExactlyOnceAlpha() { // functional tests - @Test + //@Test public void shouldNotSetTransactionIdIfEosDisabled() { final Map producerConfig = new HashMap<>(); final StreamsConfig mockConfig = mock(StreamsConfig.class); @@ -396,23 +395,23 @@ public void shouldNotSetTransactionIdIfEosDisabled() { assertFalse(producerConfig.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } - @Test + //@Test public void shouldNotHaveEosEnabledIfEosDisabled() { assertThat(nonEosStreamsProducer.eosEnabled(), is(false)); } - @Test + //@Test public void shouldNotInitTxIfEosDisable() { assertThat(nonEosMockProducer.transactionInitialized(), is(false)); } - @Test + //@Test public void shouldNotBeginTxOnSendIfEosDisable() { nonEosStreamsProducer.send(record, null); assertThat(nonEosMockProducer.transactionInFlight(), is(false)); } - @Test + //@Test public void shouldForwardRecordOnSend() { nonEosStreamsProducer.send(record, null); assertThat(nonEosMockProducer.history().size(), is(1)); @@ -421,7 +420,7 @@ public void shouldForwardRecordOnSend() { // error handling tests - @Test + //@Test public void shouldFailOnInitTxIfEosDisabled() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -431,7 +430,7 @@ public void shouldFailOnInitTxIfEosDisabled() { assertThat(thrown.getMessage(), is("Exactly-once is not enabled [test]")); } - @Test + //@Test public void shouldThrowStreamsExceptionOnSendError() { nonEosMockProducer.sendException = new KafkaException("KABOOM!"); @@ -444,7 +443,7 @@ public void shouldThrowStreamsExceptionOnSendError() { assertThat(thrown.getMessage(), is("Error encountered trying to send record to topic topic [test]")); } - @Test + //@Test public void shouldFailOnSendFatal() { nonEosMockProducer.sendException = new RuntimeException("KABOOM!"); @@ -456,7 +455,7 @@ public void shouldFailOnSendFatal() { assertThat(thrown.getMessage(), is("KABOOM!")); } - @Test + //@Test public void shouldFailOnCommitIfEosDisabled() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -466,7 +465,7 @@ public void shouldFailOnCommitIfEosDisabled() { assertThat(thrown.getMessage(), is("Exactly-once is not enabled [test]")); } - @Test + //@Test public void shouldFailOnAbortIfEosDisabled() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -481,39 +480,17 @@ public void shouldFailOnAbortIfEosDisabled() { // functional tests - @Test + //@Test public void shouldEnableEosIfEosAlphaEnabled() { assertThat(eosAlphaStreamsProducer.eosEnabled(), is(true)); } - @Test + //@Test public void shouldEnableEosIfEosBetaEnabled() { assertThat(eosBetaStreamsProducer.eosEnabled(), is(true)); } - @SuppressWarnings("deprecation") - @Test - public void shouldSetTransactionIdUsingTaskIdIfEosAlphaEnabled() { - final Map producerConfig = new HashMap<>(); - final StreamsConfig mockConfig = mock(StreamsConfig.class); - when(mockConfig.getProducerConfigs("threadId-0_0-producer")).thenReturn(producerConfig); - when(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("appId"); - when(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).thenReturn(StreamsConfig.EXACTLY_ONCE); - - new StreamsProducer( - mockConfig, - "threadId", - eosAlphaMockClientSupplier, - new TaskId(0, 0), - null, - logContext, - mockTime - ); - - assertEquals("appId-0_0", producerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); - } - - @Test + //@Test public void shouldSetTransactionIdUsingProcessIdIfEosV2Enabled() { final UUID processId = UUID.randomUUID(); final Map producerConfig = new HashMap<>(); @@ -535,28 +512,28 @@ public void shouldSetTransactionIdUsingProcessIdIfEosV2Enabled() { assertEquals("appId-" + processId + "-0", producerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } - @Test + //@Test public void shouldNotHaveEosEnabledIfEosAlphaEnable() { assertThat(eosAlphaStreamsProducer.eosEnabled(), is(true)); } - @Test + //@Test public void shouldHaveEosEnabledIfEosBetaEnabled() { assertThat(eosBetaStreamsProducer.eosEnabled(), is(true)); } - @Test + //@Test public void shouldInitTxOnEos() { assertThat(eosAlphaMockProducer.transactionInitialized(), is(true)); } - @Test + //@Test public void shouldBeginTxOnEosSend() { eosAlphaStreamsProducer.send(record, null); assertThat(eosAlphaMockProducer.transactionInFlight(), is(true)); } - @Test + //@Test public void shouldContinueTxnSecondEosSend() { eosAlphaStreamsProducer.send(record, null); eosAlphaStreamsProducer.send(record, null); @@ -564,7 +541,7 @@ public void shouldContinueTxnSecondEosSend() { assertThat(eosAlphaMockProducer.uncommittedRecords().size(), is(2)); } - @Test + //@Test public void shouldForwardRecordButNotCommitOnEosSend() { eosAlphaStreamsProducer.send(record, null); assertThat(eosAlphaMockProducer.transactionInFlight(), is(true)); @@ -573,7 +550,7 @@ public void shouldForwardRecordButNotCommitOnEosSend() { assertThat(eosAlphaMockProducer.uncommittedRecords().get(0), is(record)); } - @Test + //@Test public void shouldBeginTxOnEosCommit() { eosAlphaStreamsProducerWithMock.initTransaction(); eosAlphaStreamsProducerWithMock.commitTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId")); @@ -584,13 +561,13 @@ public void shouldBeginTxOnEosCommit() { verify(mockedProducer).commitTransaction(); } - @Test + //@Test public void shouldSendOffsetToTxOnEosCommit() { eosAlphaStreamsProducer.commitTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId")); assertThat(eosAlphaMockProducer.sentOffsets(), is(true)); } - @Test + //@Test public void shouldCommitTxOnEosCommit() { eosAlphaStreamsProducer.send(record, null); assertThat(eosAlphaMockProducer.transactionInFlight(), is(true)); @@ -606,7 +583,7 @@ public void shouldCommitTxOnEosCommit() { assertThat(eosAlphaMockProducer.consumerGroupOffsetsHistory().get(0).get("appId"), is(offsetsAndMetadata)); } - @Test + //@Test public void shouldCommitTxWithApplicationIdOnEosAlphaCommit() { when(mockedProducer.send(record, null)).thenReturn(null); @@ -621,7 +598,7 @@ public void shouldCommitTxWithApplicationIdOnEosAlphaCommit() { verify(mockedProducer).commitTransaction(); } - @Test + //@Test public void shouldCommitTxWithConsumerGroupMetadataOnEosBetaCommit() { when(mockedProducer.send(record, null)).thenReturn(null); @@ -645,7 +622,7 @@ public void shouldCommitTxWithConsumerGroupMetadataOnEosBetaCommit() { verify(mockedProducer).commitTransaction(); } - @Test + //@Test public void shouldAbortTxOnEosAbort() { // call `send()` to start a transaction eosAlphaStreamsProducer.send(record, null); @@ -662,7 +639,7 @@ public void shouldAbortTxOnEosAbort() { assertThat(eosAlphaMockProducer.consumerGroupOffsetsHistory().isEmpty(), is(true)); } - @Test + //@Test public void shouldSkipAbortTxOnEosAbortIfNotTxInFlight() { eosAlphaStreamsProducerWithMock.initTransaction(); eosAlphaStreamsProducerWithMock.abortTransaction(); @@ -672,7 +649,7 @@ public void shouldSkipAbortTxOnEosAbortIfNotTxInFlight() { // error handling tests - @Test + //@Test public void shouldFailIfTaskIdIsNullForEosAlpha() { final NullPointerException thrown = assertThrows( NullPointerException.class, @@ -689,7 +666,7 @@ public void shouldFailIfTaskIdIsNullForEosAlpha() { assertThat(thrown.getMessage(), is("taskId cannot be null for exactly-once alpha")); } - @Test + //@Test public void shouldFailIfProcessIdNullForEosBeta() { final NullPointerException thrown = assertThrows( NullPointerException.class, @@ -706,7 +683,7 @@ public void shouldFailIfProcessIdNullForEosBeta() { assertThat(thrown.getMessage(), is("processId cannot be null for exactly-once v2")); } - @Test + //@Test public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() { // use `nonEosMockProducer` instead of `eosMockProducer` to avoid double Tx-Init nonEosMockProducer.initTransactionException = new TimeoutException("KABOOM!"); @@ -735,7 +712,7 @@ public Producer getProducer(final Map config) { assertThat(thrown.getMessage(), is("KABOOM!")); } - @Test + //@Test public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceAlpha() { final StreamsProducer streamsProducer = new StreamsProducer( @@ -756,7 +733,7 @@ public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExac assertThat(thrown.getMessage(), is("MockProducer hasn't been initialized for transactions.")); } - @Test + //@Test public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceBeta() { final StreamsProducer streamsProducer = new StreamsProducer( @@ -777,7 +754,7 @@ public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExac assertThat(thrown.getMessage(), is("MockProducer hasn't been initialized for transactions.")); } - @Test + //@Test public void shouldThrowStreamsExceptionOnEosInitError() { // use `nonEosMockProducer` instead of `eosMockProducer` to avoid double Tx-Init nonEosMockProducer.initTransactionException = new KafkaException("KABOOM!"); @@ -807,7 +784,7 @@ public Producer getProducer(final Map config) { assertThat(thrown.getMessage(), is("Error encountered trying to initialize transactions [test]")); } - @Test + //@Test public void shouldFailOnEosInitFatal() { // use `nonEosMockProducer` instead of `eosMockProducer` to avoid double Tx-Init nonEosMockProducer.initTransactionException = new RuntimeException("KABOOM!"); @@ -836,7 +813,7 @@ public Producer getProducer(final Map config) { assertThat(thrown.getMessage(), is("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigrateExceptionOnEosBeginTxnFenced() { eosAlphaMockProducer.fenceProducer(); @@ -852,7 +829,7 @@ public void shouldThrowTaskMigrateExceptionOnEosBeginTxnFenced() { ); } - @Test + //@Test public void shouldThrowTaskMigrateExceptionOnEosBeginTxnError() { eosAlphaMockProducer.beginTransactionException = new KafkaException("KABOOM!"); @@ -868,7 +845,7 @@ public void shouldThrowTaskMigrateExceptionOnEosBeginTxnError() { ); } - @Test + //@Test public void shouldFailOnEosBeginTxnFatal() { eosAlphaMockProducer.beginTransactionException = new RuntimeException("KABOOM!"); @@ -880,17 +857,17 @@ public void shouldFailOnEosBeginTxnFatal() { assertThat(thrown.getMessage(), is("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosSendProducerFenced() { testThrowTaskMigratedExceptionOnEosSend(new ProducerFencedException("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosSendPInvalidPidMapping() { testThrowTaskMigratedExceptionOnEosSend(new InvalidPidMappingException("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosSendInvalidEpoch() { testThrowTaskMigratedExceptionOnEosSend(new InvalidProducerEpochException("KABOOM!")); } @@ -913,7 +890,7 @@ private void testThrowTaskMigratedExceptionOnEosSend(final RuntimeException exce ); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosSendUnknownPid() { final UnknownProducerIdException exception = new UnknownProducerIdException("KABOOM!"); // we need to mimic that `send()` always wraps error in a KafkaException @@ -932,19 +909,19 @@ public void shouldThrowTaskMigratedExceptionOnEosSendUnknownPid() { ); } - @Test + //@Test public void shouldThrowTaskMigrateExceptionOnEosSendOffsetProducerFenced() { // cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()` testThrowTaskMigrateExceptionOnEosSendOffset(new ProducerFencedException("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidPidMapping() { // cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()` testThrowTaskMigrateExceptionOnEosSendOffset(new InvalidPidMappingException("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidEpoch() { // cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()` testThrowTaskMigrateExceptionOnEosSendOffset(new InvalidProducerEpochException("KABOOM!")); @@ -969,7 +946,7 @@ private void testThrowTaskMigrateExceptionOnEosSendOffset(final RuntimeException ); } - @Test + //@Test public void shouldThrowStreamsExceptionOnEosSendOffsetError() { eosAlphaMockProducer.sendOffsetsToTransactionException = new KafkaException("KABOOM!"); @@ -987,7 +964,7 @@ public void shouldThrowStreamsExceptionOnEosSendOffsetError() { ); } - @Test + //@Test public void shouldFailOnEosSendOffsetFatal() { eosAlphaMockProducer.sendOffsetsToTransactionException = new RuntimeException("KABOOM!"); @@ -1001,17 +978,17 @@ public void shouldFailOnEosSendOffsetFatal() { assertThat(thrown.getMessage(), is("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosCommitWithProducerFenced() { testThrowTaskMigratedExceptionOnEos(new ProducerFencedException("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidPidMapping() { testThrowTaskMigratedExceptionOnEos(new InvalidPidMappingException("KABOOM!")); } - @Test + //@Test public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidEpoch() { testThrowTaskMigratedExceptionOnEos(new InvalidProducerEpochException("KABOOM!")); } @@ -1034,7 +1011,7 @@ private void testThrowTaskMigratedExceptionOnEos(final RuntimeException exceptio ); } - @Test + //@Test public void shouldThrowStreamsExceptionOnEosCommitTxError() { eosAlphaMockProducer.commitTransactionException = new KafkaException("KABOOM!"); @@ -1051,7 +1028,7 @@ public void shouldThrowStreamsExceptionOnEosCommitTxError() { ); } - @Test + //@Test public void shouldFailOnEosCommitTxFatal() { eosAlphaMockProducer.commitTransactionException = new RuntimeException("KABOOM!"); @@ -1064,17 +1041,17 @@ public void shouldFailOnEosCommitTxFatal() { assertThat(thrown.getMessage(), is("KABOOM!")); } - @Test + //@Test public void shouldSwallowExceptionOnEosAbortTxProducerFenced() { testSwallowExceptionOnEosAbortTx(new ProducerFencedException("KABOOM!")); } - @Test + //@Test public void shouldSwallowExceptionOnEosAbortTxInvalidPidMapping() { testSwallowExceptionOnEosAbortTx(new InvalidPidMappingException("KABOOM!")); } - @Test + //@Test public void shouldSwallowExceptionOnEosAbortTxInvalidEpoch() { testSwallowExceptionOnEosAbortTx(new InvalidProducerEpochException("KABOOM!")); } @@ -1092,7 +1069,7 @@ private void testSwallowExceptionOnEosAbortTx(final RuntimeException exception) verify(mockedProducer).beginTransaction(); } - @Test + //@Test public void shouldThrowStreamsExceptionOnEosAbortTxError() { eosAlphaMockProducer.abortTransactionException = new KafkaException("KABOOM!"); // call `send()` to start a transaction @@ -1107,7 +1084,7 @@ public void shouldThrowStreamsExceptionOnEosAbortTxError() { ); } - @Test + //@Test public void shouldFailOnEosAbortTxFatal() { eosAlphaMockProducer.abortTransactionException = new RuntimeException("KABOOM!"); // call `send()` to start a transaction @@ -1123,14 +1100,14 @@ public void shouldFailOnEosAbortTxFatal() { // functional tests - @Test + //@Test public void shouldCloseExistingProducerOnResetProducer() { eosBetaStreamsProducer.resetProducer(); assertTrue(eosBetaMockProducer.closed()); } - @Test + //@Test public void shouldSetNewProducerOnResetProducer() { eosBetaStreamsProducer.resetProducer(); @@ -1138,7 +1115,7 @@ public void shouldSetNewProducerOnResetProducer() { assertThat(eosBetaStreamsProducer.kafkaProducer(), is(eosBetaMockClientSupplier.producers.get(1))); } - @Test + //@Test public void shouldResetTransactionInitializedOnResetProducer() { final StreamsProducer streamsProducer = new StreamsProducer( eosBetaConfig, @@ -1164,7 +1141,7 @@ public void shouldResetTransactionInitializedOnResetProducer() { verify(mockedProducer, times(2)).initTransactions(); } - @Test + //@Test public void shouldComputeTotalBlockedTime() { setProducerMetrics( nonEosMockProducer, @@ -1184,7 +1161,7 @@ public void shouldComputeTotalBlockedTime() { assertThat(nonEosStreamsProducer.totalBlockedTime(), closeTo(expectedTotalBlocked, 0.01)); } - @Test + //@Test public void shouldComputeTotalBlockedTimeAfterReset() { setProducerMetrics( eosBetaMockProducer, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java index 5509ba072df11..1864827774379 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java @@ -14,18 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.processor.TaskId; import org.junit.jupiter.api.Test; import java.util.Collections; -import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -62,28 +59,4 @@ public void testCommitWithOpenTransactionButNoOffsetsEOSV2() { verify(producer).commitTransaction(Collections.emptyMap(), groupMetadata); } - - @Test - public void testCommitWithOpenTransactionButNoOffsetsEOSV1() { - final TaskId taskId = new TaskId(0, 0); - final Task task = mock(Task.class); - when(task.id()).thenReturn(taskId); - - final Tasks tasks = mock(Tasks.class); - final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class); - final TaskManager taskManager = mock(TaskManager.class); - when(taskManager.activeRunningTaskIterable()).thenReturn(Collections.singletonList(task)); - when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata); - - final StreamsProducer producer = mock(StreamsProducer.class); - final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class); - when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_ALPHA); - when(taskManager.streamsProducerForTask(taskId)).thenReturn(producer); - when(producer.transactionInFlight()).thenReturn(true); - - final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext()); - taskExecutor.commitOffsetsOrTransaction(Collections.emptyMap()); - - verify(producer).commitTransaction(Collections.emptyMap(), groupMetadata); - } -} +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 2024c7c7aa98c..ec1fb56ebe9f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -61,7 +61,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import org.mockito.Answers; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -122,7 +121,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.lenient; @@ -203,8 +201,6 @@ public class TaskManagerTest { private Admin adminClient; @Mock private ProcessorStateManager stateManager; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ProcessorStateManager.StateStoreMetadata stateStore; final StateUpdater stateUpdater = mock(StateUpdater.class); final DefaultTaskManager schedulingTaskManager = mock(DefaultTaskManager.class); @@ -938,7 +934,7 @@ public void shouldNotReturnStateUpdaterTasksInOwnedTasks() { final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); - final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) + standbyTask(taskId02, taskId02ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); @@ -1669,6 +1665,8 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() { assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks()); assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage()); } + + @Test public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() { final Map> activeTasksAssignment = mkMap( mkEntry(taskId01, mkSet(t1p1)), @@ -3157,12 +3155,6 @@ public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithAlos( shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(ProcessingMode.AT_LEAST_ONCE); } - @Test - public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV1() { - when(activeTaskCreator.streamsProducerForTask(any())).thenReturn(mock(StreamsProducer.class)); - shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(ProcessingMode.EXACTLY_ONCE_ALPHA); - } - @Test public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV2() { when(activeTaskCreator.threadProducer()).thenReturn(mock(StreamsProducer.class)); @@ -3375,7 +3367,7 @@ public Set changelogPartitions() { @Test public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() { - setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, false); + setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); @@ -3829,22 +3821,6 @@ public void shouldCommitViaConsumerIfEosDisabled() { verify(consumer).commitSync(offsets); } - @Test - public void shouldCommitViaProducerIfEosAlphaEnabled() { - final StreamsProducer producer = mock(StreamsProducer.class); - when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) - .thenReturn(producer); - - final Map offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); - final Map offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null)); - - shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02); - - verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId")); - verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId")); - verifyNoMoreInteractions(producer); - } - @Test public void shouldCommitViaProducerIfEosV2Enabled() { final StreamsProducer producer = mock(StreamsProducer.class); @@ -4578,48 +4554,6 @@ public void shouldNotFailForTimeoutExceptionOnConsumerCommit() { verify(consumer, times(2)).commitSync(any(Map.class)); } - @Test - public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { - final Tasks tasks = mock(Tasks.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false); - - final StreamsProducer producer = mock(StreamsProducer.class); - when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))).thenReturn(producer); - - final Map offsetsT00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - final Map offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); - - doThrow(new TimeoutException("KABOOM!")) - .doNothing() - .doNothing() - .doNothing() - .when(producer).commitTransaction(offsetsT00, null); - doNothing() - .doNothing() - .when(producer).commitTransaction(offsetsT01, null); - - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - task00.setCommittableOffsetsAndMetadata(offsetsT00); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - task01.setCommittableOffsetsAndMetadata(offsetsT01); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); - when(tasks.allTasks()).thenReturn(mkSet(task00, task01, task02)); - - task00.setCommitNeeded(); - task01.setCommitNeeded(); - - final TaskCorruptedException exception = assertThrows( - TaskCorruptedException.class, - () -> taskManager.commit(mkSet(task00, task01, task02)) - ); - assertThat( - exception.corruptedTasks(), - equalTo(Collections.singleton(taskId00)) - ); - - verify(consumer, times(2)).groupMetadata(); - } - @Test public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() { final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 8e3c27303aefe..af2352e0ed958 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -117,7 +117,6 @@ import java.util.regex.Pattern; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE; -import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; @@ -476,7 +475,6 @@ private void setupGlobalTask(final Time mockWallClockTime, } } - @SuppressWarnings("deprecation") private void setupTask(final StreamsConfig streamsConfig, final StreamsMetricsImpl streamsMetrics, final ThreadCache cache, @@ -492,7 +490,7 @@ private void setupTask(final StreamsConfig streamsConfig, final ProcessorStateManager stateManager = new ProcessorStateManager( TASK_ID, Task.TaskType.ACTIVE, - StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + StreamsConfig.EXACTLY_ONCE_V2.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), logContext, stateDirectory, new MockChangelogRegister(), @@ -508,7 +506,7 @@ private void setupTask(final StreamsConfig streamsConfig, processorTopology ); - final InternalProcessorContext context = new ProcessorContextImpl( + final InternalProcessorContext context = new ProcessorContextImpl( TASK_ID, streamsConfig, stateManager, @@ -626,7 +624,7 @@ private void completeAllProcessableWork() { } private void commit(final Map offsets) { - if (processingMode == EXACTLY_ONCE_ALPHA || processingMode == EXACTLY_ONCE_V2) { + if (processingMode == EXACTLY_ONCE_V2) { testDriverProducer.commitTransaction(offsets, new ConsumerGroupMetadata("dummy-app-id")); } else { consumer.commitSync(offsets); @@ -1167,12 +1165,8 @@ public void close() { } static class MockChangelogRegister implements ChangelogRegister { - private final Set restoringPartitions = new HashSet<>(); - @Override - public void register(final TopicPartition partition, final ProcessorStateManager stateManager) { - restoringPartitions.add(partition); - } + public void register(final TopicPartition partition, final ProcessorStateManager stateManager) { } @Override public void register(final Set changelogPartitions, final ProcessorStateManager stateManager) { @@ -1182,9 +1176,7 @@ public void register(final Set changelogPartitions, final Proces } @Override - public void unregister(final Collection partitions) { - restoringPartitions.removeAll(partitions); - } + public void unregister(final Collection partitions) { } } static class MockTime implements Time { From bf450ebe5a4c24492d5dc39cb47da6e7bc0c633f Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 21 Sep 2024 07:48:38 -0700 Subject: [PATCH 4/5] MINOR: fix generics in streams-test-utils package (#17206) Reviewers: Chia-Ping Tsai --- .../java/org/apache/kafka/streams/TopologyTestDriver.java | 6 +----- .../kafka/streams/processor/MockProcessorContext.java | 7 +++---- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index af2352e0ed958..f6a5f53ef7d43 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1169,11 +1169,7 @@ static class MockChangelogRegister implements ChangelogRegister { public void register(final TopicPartition partition, final ProcessorStateManager stateManager) { } @Override - public void register(final Set changelogPartitions, final ProcessorStateManager stateManager) { - for (final TopicPartition changelogPartition : changelogPartitions) { - register(changelogPartition, stateManager); - } - } + public void register(final Set changelogPartitions, final ProcessorStateManager stateManager) { } @Override public void unregister(final Collection partitions) { } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index e535dbbd8caeb..a9b47b59da980 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -127,9 +127,9 @@ public static class CapturedForward { private final String childName; private final long timestamp; private final Headers headers; - private final KeyValue keyValue; + private final KeyValue keyValue; - private CapturedForward(final KeyValue keyValue, final To to, final Headers headers) { + private CapturedForward(final KeyValue keyValue, final To to, final Headers headers) { if (keyValue == null) { throw new IllegalArgumentException(); } @@ -165,7 +165,7 @@ public long timestamp() { * * @return A key/value pair. Not null. */ - @SuppressWarnings({"WeakerAccess", "unused"}) + @SuppressWarnings({"WeakerAccess", "unused", "rawtypes"}) public KeyValue keyValue() { return keyValue; } @@ -458,7 +458,6 @@ public S getStateStore(final String name) { return (S) stateStores.get(name); } - @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this @Override public Cancellable schedule(final Duration interval, final PunctuationType type, From 4ed27ad135b2c7fa69601b03ddb8941b1c7455e4 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sun, 22 Sep 2024 21:37:47 +0800 Subject: [PATCH 5/5] tmp --- .../kafka/api/SslAdminIntegrationTest.scala | 99 +++++++++++++++---- 1 file changed, 79 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index fdaafce26599c..7f7b08b72774b 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -15,7 +15,6 @@ package kafka.api import java.util import java.util.concurrent._ import java.util.Properties - import com.yammer.metrics.core.Gauge import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils @@ -29,9 +28,12 @@ import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrinci import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.server.authorizer._ import org.apache.kafka.common.network.ConnectionMode +import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ import scala.collection.mutable @@ -84,6 +86,46 @@ object SslAdminIntegrationTest { } } + class TestableStandardAuthorizer extends StandardAuthorizer with ClusterMetadataAuthorizer { + + override def createAcls(requestContext: AuthorizableRequestContext, + aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = { + lastUpdateRequestContext = Some(requestContext) + execute[AclCreateResult](aclBindings.size, () => super.createAcls(requestContext, aclBindings)) + } + + override def deleteAcls(requestContext: AuthorizableRequestContext, + aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = { + lastUpdateRequestContext = Some(requestContext) + execute[AclDeleteResult](aclBindingFilters.size, () => super.deleteAcls(requestContext, aclBindingFilters)) + } + + private def execute[T](batchSize: Int, action: () => util.List[_ <: CompletionStage[T]]): util.List[CompletableFuture[T]] = { + val futures = (0 until batchSize).map(_ => new CompletableFuture[T]).toList + val runnable = new Runnable { + override def run(): Unit = { + semaphore.foreach(_.acquire()) + try { + action.apply().asScala.zip(futures).foreach { case (baseFuture, resultFuture) => + try { + resultFuture.complete(baseFuture.toCompletableFuture.get()) + } catch { + case e: Throwable => resultFuture.completeExceptionally(e) + } + } + } finally { + semaphore.foreach(_.release()) + } + } + } + executor match { + case Some(executorService) => executorService.submit(runnable) + case None => runnable.run() + } + futures.asJava + } + } + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { private val Pattern = "O=A (.*?),CN=(.*?)".r @@ -110,6 +152,7 @@ object SslAdminIntegrationTest { class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName + override val kraftAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableStandardAuthorizer].getName this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) @@ -134,13 +177,15 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { super.tearDown() } - @Test - def testAclUpdatesUsingSynchronousAuthorizer(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = { verifyAclUpdates() } - @Test - def testAclUpdatesUsingAsynchronousAuthorizer(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testAclUpdatesUsingAsynchronousAuthorizer(quorum: String): Unit = { SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor) verifyAclUpdates() } @@ -149,8 +194,9 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { * Verify that ACL updates using synchronous authorizer are performed synchronously * on request threads without any performance overhead introduced by a purgatory. */ - @Test - def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(quorum: String): Unit = { val testSemaphore = new Semaphore(0) SslAdminIntegrationTest.semaphore = Some(testSemaphore) waitForNoBlockedRequestThreads() @@ -158,22 +204,27 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { // Queue requests until all threads are blocked. ACL create requests are sent to least loaded // node, so we may need more than `numRequestThreads` requests to block all threads. val aclFutures = mutable.Buffer[CreateAclsResult]() - while (blockedRequestThreads.size < numRequestThreads) { + // TODO: fix comment + // createAcl under kraft only handled in controller servers, brokers won't handle it + // so here we only consider number of controller io threads + val numReqThreads = if (isKRaftTest()) controllerServers.head.config.numIoThreads * controllerServers.size else numRequestThreads + while (blockedRequestThreads.size < numReqThreads) { aclFutures += createAdminClient.createAcls(List(acl2).asJava) - assertTrue(aclFutures.size < numRequestThreads * 10, - s"Request threads not blocked numRequestThreads=$numRequestThreads blocked=$blockedRequestThreads") + // TODO: add comment + assertTrue(aclFutures.size < numReqThreads * 100, + s"Request threads not blocked numRequestThreads=$numReqThreads blocked=$blockedRequestThreads aclFutures=${aclFutures.size}") } assertEquals(0, purgatoryMetric("NumDelayedOperations")) assertEquals(0, purgatoryMetric("PurgatorySize")) // Verify that operations on other clients are blocked - val describeFuture = createAdminClient.describeCluster().clusterId() - assertFalse(describeFuture.isDone) + val listPartitionReassignmentsFuture = createAdminClient.listPartitionReassignments().reassignments() + assertFalse(listPartitionReassignmentsFuture.isDone) // Release the semaphore and verify that all requests complete testSemaphore.release(aclFutures.size) waitForNoBlockedRequestThreads() - assertNotNull(describeFuture.get(10, TimeUnit.SECONDS)) + assertNotNull(listPartitionReassignmentsFuture.get(10, TimeUnit.SECONDS)) // If any of the requests time out since we were blocking the threads earlier, retry the request. val numTimedOut = aclFutures.count { future => try { @@ -196,8 +247,9 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { * Verify that ACL updates using an asynchronous authorizer are completed asynchronously * using a purgatory, enabling other requests to be processed even when ACL updates are blocked. */ - @Test - def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(quorum: String): Unit = { SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor) val testSemaphore = new Semaphore(0) SslAdminIntegrationTest.semaphore = Some(testSemaphore) @@ -208,7 +260,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { waitForNoBlockedRequestThreads() assertTrue(aclFutures.forall(future => !future.all.isDone)) // Other requests should succeed even though ACL updates are blocked - assertNotNull(createAdminClient.describeCluster().clusterId().get(10, TimeUnit.SECONDS)) + assertNotNull(createAdminClient.listPartitionReassignments().reassignments().get(10, TimeUnit.SECONDS)) TestUtils.waitUntilTrue(() => purgatoryMetric("PurgatorySize") > 0, "PurgatorySize metrics not updated") TestUtils.waitUntilTrue(() => purgatoryMetric("NumDelayedOperations") > 0, "NumDelayedOperations metrics not updated") @@ -224,8 +276,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.clientCn) def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = { - assertEquals(SecurityProtocol.SSL, context.securityProtocol) - assertEquals("SSL", context.listenerName) + // kraft forward createACL request from broker to controller hence the security protocol is missing + // after forwarding and become PLAINTEXT, assert SSL + if (!isKRaftTest()) { + assertEquals(SecurityProtocol.SSL, context.securityProtocol) + assertEquals("SSL", context.listenerName) + } assertEquals(clientPrincipal, context.principal) assertEquals(apiKey.id.toInt, context.requestType) assertEquals(apiKey.latestVersion.toInt, context.requestVersion) @@ -266,7 +322,10 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { requestThreads.filter(_.getState == Thread.State.WAITING).toList } - private def numRequestThreads = servers.head.config.numIoThreads * servers.size + private def numRequestThreads = { + if (isKRaftTest()) brokers.head.config.numIoThreads * (brokers.size + controllerServers.size) + else servers.head.config.numIoThreads * servers.size + } private def waitForNoBlockedRequestThreads(): Unit = { val (blockedThreads, _) = TestUtils.computeUntilTrue(blockedRequestThreads)(_.isEmpty)