From 2ae700a0c6061529861388eea1313b7364244260 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Wed, 12 Jun 2024 14:24:19 +0800 Subject: [PATCH] apply connect --- build.gradle | 8 --- .../kafka/connect/connector/Connector.java | 2 +- .../org/apache/kafka/connect/data/Struct.java | 1 - .../org/apache/kafka/connect/data/Values.java | 1 + .../kafka/connect/health/AbstractState.java | 4 +- .../kafka/connect/health/ConnectorHealth.java | 4 +- .../connect/sink/ErrantRecordReporter.java | 3 +- .../storage/SimpleHeaderConverter.java | 1 + .../connect/connector/ConnectorTest.java | 6 +- .../kafka/connect/data/ConnectSchemaTest.java | 1 + .../apache/kafka/connect/data/DateTest.java | 1 + .../kafka/connect/data/SchemaBuilderTest.java | 1 + .../connect/data/SchemaProjectorTest.java | 1 + .../apache/kafka/connect/data/TimeTest.java | 1 + .../kafka/connect/data/TimestampTest.java | 1 + .../apache/kafka/connect/data/ValuesTest.java | 1 + .../connect/header/ConnectHeaderTest.java | 1 + .../connect/header/ConnectHeadersTest.java | 1 + .../kafka/connect/sink/SinkConnectorTest.java | 6 +- .../kafka/connect/sink/SinkRecordTest.java | 1 + .../connect/source/SourceConnectorTest.java | 6 +- .../connect/source/SourceRecordTest.java | 1 + .../storage/SimpleHeaderConverterTest.java | 1 + .../connect/storage/StringConverterTest.java | 1 + .../BasicAuthSecurityRestExtension.java | 4 +- .../auth/extension/JaasBasicAuthFilter.java | 20 +++--- .../extension/PropertyFileLoginModule.java | 1 + .../BasicAuthSecurityRestExtensionTest.java | 7 +- .../extension/JaasBasicAuthFilterTest.java | 14 ++-- .../connect/file/FileStreamSinkConnector.java | 1 + .../connect/file/FileStreamSinkTask.java | 1 + .../file/FileStreamSourceConnector.java | 1 + .../connect/file/FileStreamSourceTask.java | 17 ++--- .../file/FileStreamSinkConnectorTest.java | 9 +-- .../connect/file/FileStreamSinkTaskTest.java | 2 + .../file/FileStreamSourceConnectorTest.java | 1 + .../file/FileStreamSourceTaskTest.java | 1 + ...ileStreamSinkConnectorIntegrationTest.java | 1 + ...eStreamSourceConnectorIntegrationTest.java | 1 + .../kafka/connect/json/JsonConverter.java | 25 +++---- .../connect/json/JsonConverterConfig.java | 2 +- .../kafka/connect/json/JsonDeserializer.java | 5 +- .../kafka/connect/json/JsonSerializer.java | 5 +- .../connect/json/JsonConverterConfigTest.java | 1 + .../kafka/connect/json/JsonConverterTest.java | 12 ++-- .../kafka/connect/mirror/Checkpoint.java | 9 ++- .../mirror/DefaultReplicationPolicy.java | 6 +- .../kafka/connect/mirror/Heartbeat.java | 7 +- .../mirror/IdentityReplicationPolicy.java | 4 +- .../kafka/connect/mirror/MirrorClient.java | 20 +++--- .../connect/mirror/MirrorClientConfig.java | 6 +- .../connect/mirror/RemoteClusterUtils.java | 4 +- .../kafka/connect/mirror/SourceAndTarget.java | 1 - .../connect/mirror/MirrorClientTest.java | 10 +-- .../connect/mirror/ReplicationPolicyTest.java | 2 +- .../kafka/connect/mirror/CheckpointStore.java | 1 + .../connect/mirror/ConfigPropertyFilter.java | 1 + .../mirror/DefaultConfigPropertyFilter.java | 2 +- .../connect/mirror/DefaultGroupFilter.java | 2 +- .../connect/mirror/DefaultTopicFilter.java | 2 +- .../kafka/connect/mirror/GroupFilter.java | 1 + .../mirror/MirrorCheckpointConnector.java | 1 + .../mirror/MirrorCheckpointMetrics.java | 14 ++-- .../connect/mirror/MirrorCheckpointTask.java | 24 +++---- .../mirror/MirrorCheckpointTaskConfig.java | 7 +- .../connect/mirror/MirrorConnectorConfig.java | 15 ++-- .../mirror/MirrorHeartbeatConnector.java | 10 +-- .../connect/mirror/MirrorHeartbeatTask.java | 10 +-- .../kafka/connect/mirror/MirrorHerder.java | 1 + .../kafka/connect/mirror/MirrorMaker.java | 40 +++++------ .../connect/mirror/MirrorMakerConfig.java | 23 +++--- .../connect/mirror/MirrorSourceConnector.java | 71 +++++++++---------- .../connect/mirror/MirrorSourceTask.java | 28 ++++---- .../mirror/MirrorSourceTaskConfig.java | 6 +- .../kafka/connect/mirror/MirrorUtils.java | 11 +-- .../kafka/connect/mirror/OffsetSync.java | 3 +- .../kafka/connect/mirror/OffsetSyncStore.java | 1 + .../kafka/connect/mirror/Scheduler.java | 13 ++-- .../kafka/connect/mirror/TopicFilter.java | 1 + .../formatters/CheckpointFormatter.java | 4 +- .../mirror/formatters/HeartbeatFormatter.java | 4 +- .../formatters/OffsetSyncFormatter.java | 4 +- .../connect/mirror/rest/MirrorRestServer.java | 1 + .../resources/InternalMirrorResource.java | 3 +- .../connect/mirror/CheckpointStoreTest.java | 5 +- .../mirror/MirrorCheckpointConfigTest.java | 1 + .../mirror/MirrorCheckpointConnectorTest.java | 1 + .../mirror/MirrorCheckpointTaskTest.java | 15 ++-- .../mirror/MirrorConnectorConfigTest.java | 7 +- .../mirror/MirrorHeartBeatConnectorTest.java | 17 ++--- .../mirror/MirrorHeartbeatTaskTest.java | 1 + .../connect/mirror/MirrorMakerConfigTest.java | 14 ++-- .../mirror/MirrorSourceConfigTest.java | 1 + .../mirror/MirrorSourceConnectorTest.java | 46 ++++++------ .../mirror/MirrorSourceMetricsTest.java | 1 + .../connect/mirror/MirrorSourceTaskTest.java | 11 ++- .../kafka/connect/mirror/MirrorUtilsTest.java | 1 + .../FakeForwardingAdminWithLocalMetadata.java | 1 + .../clients/admin/FakeLocalMetadataStore.java | 1 + .../DedicatedMirrorIntegrationTest.java | 1 + .../IdentityReplicationIntegrationTest.java | 5 +- .../MirrorConnectorsIntegrationBaseTest.java | 28 ++++---- ...rConnectorsIntegrationExactlyOnceTest.java | 1 + .../MirrorConnectorsIntegrationSSLTest.java | 9 ++- ...ConnectorsIntegrationTransactionsTest.java | 1 + ...hCustomForwardingAdminIntegrationTest.java | 1 + .../kafka/connect/tools/MockConnector.java | 1 + .../kafka/connect/tools/MockSinkTask.java | 1 + .../kafka/connect/tools/MockSourceTask.java | 1 + .../kafka/connect/tools/SchemaSourceTask.java | 1 + .../connect/tools/VerifiableSinkTask.java | 5 +- .../connect/tools/VerifiableSourceTask.java | 12 ++-- .../apache/kafka/connect/transforms/Cast.java | 1 + .../connect/transforms/ExtractField.java | 2 +- .../kafka/connect/transforms/Filter.java | 4 +- .../kafka/connect/transforms/InsertField.java | 2 +- .../kafka/connect/transforms/MaskField.java | 2 +- .../kafka/connect/transforms/RegexRouter.java | 1 + .../connect/transforms/SetSchemaMetadata.java | 1 + .../transforms/predicates/HasHeaderKey.java | 6 +- .../predicates/RecordIsTombstone.java | 4 +- .../predicates/TopicNameMatches.java | 8 +-- .../kafka/connect/transforms/CastTest.java | 1 + .../connect/transforms/DropHeadersTest.java | 2 +- .../connect/transforms/ExtractFieldTest.java | 3 +- .../kafka/connect/transforms/FlattenTest.java | 1 + .../connect/transforms/HeaderFromTest.java | 2 +- .../connect/transforms/HoistFieldTest.java | 1 + .../connect/transforms/InsertFieldTest.java | 1 + .../connect/transforms/InsertHeaderTest.java | 2 +- .../connect/transforms/MaskFieldTest.java | 1 + .../connect/transforms/RegexRouterTest.java | 1 + .../connect/transforms/ReplaceFieldTest.java | 1 + .../transforms/SetSchemaMetadataTest.java | 1 + .../transforms/TimestampConverterTest.java | 3 +- .../transforms/TimestampRouterTest.java | 1 + .../connect/transforms/ValueToKeyTest.java | 1 + .../field/FieldPathNotationTest.java | 1 + .../field/FieldSyntaxVersionTest.java | 1 + .../transforms/field/SingleFieldPathTest.java | 7 +- .../predicates/HasHeaderKeyTest.java | 1 + .../predicates/TopicNameMatchesTest.java | 1 + .../util/NonEmptyListValidatorTest.java | 1 + 143 files changed, 442 insertions(+), 372 deletions(-) diff --git a/build.gradle b/build.gradle index 337cb12ebccfa..2a8ab3a4be212 100644 --- a/build.gradle +++ b/build.gradle @@ -202,15 +202,7 @@ def determineCommitId() { } def excludedSpotlessModules = [':clients', - ':connect:api', - ':connect:basic-auth-extension', - ':connect:file', - ':connect:json', - ':connect:mirror', - ':connect:mirror-client', ':connect:runtime', - ':connect:test-plugins', - ':connect:transforms', ':core', ':examples', ':generator', diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java index 6ea43fd99106a..927e4170f7732 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java @@ -19,8 +19,8 @@ import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.errors.ConnectException; import java.util.List; import java.util.Map; diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 1835498808b7e..6e5b81ab11537 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -284,4 +284,3 @@ public String toString() { } } - diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index e144b7c69b6c7..a528271d1ab44 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.errors.DataException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java index 356658f9320b1..2dac451a1cd3f 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java @@ -17,10 +17,10 @@ package org.apache.kafka.connect.health; -import java.util.Objects; - import org.apache.kafka.common.utils.Utils; +import java.util.Objects; + /** * Provides the current status for a connector or a task, along with an identifier for its Connect worker */ diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java index 1f781574f52a9..876a0b3e3b94c 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.connect.health; +import org.apache.kafka.common.utils.Utils; + import java.util.Map; import java.util.Objects; -import org.apache.kafka.common.utils.Utils; - /** * Provides basic health information about the connector and its tasks. */ diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java index b5496d2d84883..81b74a58f8ea2 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java @@ -16,9 +16,10 @@ */ package org.apache.kafka.connect.sink; -import java.util.concurrent.Future; import org.apache.kafka.connect.errors.ConnectException; +import java.util.concurrent.Future; + /** * Component that a {@link SinkTask} can use to report problematic records (and their corresponding problems) as it * writes them through {@link SinkTask#put(java.util.Collection)}. diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java index 685905e8ead6d..3589beb5087f9 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.errors.DataException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java index ce0c1d427f7fc..94ff9ad09e240 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.connect.connector; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - public abstract class ConnectorTest { protected ConnectorContext context; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java index 43c2342fe3b41..b4e9f81ce8163 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; + import org.junit.jupiter.api.Test; import java.math.BigDecimal; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java index 2cd656c783c13..51025b44828d5 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; + import org.junit.jupiter.api.Test; import java.util.Calendar; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java index ba7c574d24682..c789541ae5377 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.SchemaBuilderException; + import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 32e304c218ac9..4ec35d369adb9 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.SchemaProjectorException; + import org.junit.jupiter.api.Test; import java.math.BigDecimal; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java index b07ccc09145c6..58d6831542b75 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; + import org.junit.jupiter.api.Test; import java.util.Calendar; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java index 94f67b4250453..8a7f97b49c245 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; + import org.junit.jupiter.api.Test; import java.util.Calendar; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index df3c2ade5dd83..9a96882d976e3 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.Values.Parser; import org.apache.kafka.connect.errors.DataException; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java index 8a84d44977b43..57d7634a6fdb4 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java index b9b9174d73da1..44073f7722927 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.header.Headers.HeaderTransform; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java index 2cf22785bd90b..dc89ff59f2937 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.connect.sink; -import java.util.List; -import java.util.Map; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.ConnectorTest; import org.apache.kafka.connect.connector.Task; +import java.util.List; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java index 329b28ee8b69a..099b1a81fad27 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java index 3359b1a836c3c..e1a6c54ebfd77 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java @@ -16,15 +16,15 @@ */ package org.apache.kafka.connect.source; -import java.util.List; -import java.util.Map; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.ConnectorTest; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.storage.OffsetStorageReader; +import java.util.List; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java index d098760f286db..90bd4f897df28 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java index d13b53e74d0a4..14fd0d60c142b 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java index f6e9bdbfa16bc..463125e09404f 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; + import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java index 8ba0215e142d6..58aac7994aefc 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java @@ -21,13 +21,15 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.Configuration; import java.util.Map; import java.util.function.Supplier; +import javax.security.auth.login.Configuration; + /** * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link * javax.security.auth.spi.LoginModule}. An entry with the name {@code KafkaConnect} is expected in the JAAS config file configured in the diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java index 964bd7c397771..b090ee21d449d 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java @@ -17,9 +17,17 @@ package org.apache.kafka.connect.rest.basic.auth.extension; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -27,22 +35,14 @@ import java.util.regex.Pattern; import javax.annotation.Priority; -import javax.security.auth.login.Configuration; -import javax.ws.rs.HttpMethod; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.connect.errors.ConnectException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.StandardCharsets; -import java.util.Base64; - import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; +import javax.ws.rs.HttpMethod; import javax.ws.rs.Priorities; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestFilter; diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java index 7803dffa9416d..65f11990346a1 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Utils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java index b1b5b1ef7cdff..146bd6a2adf63 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java @@ -19,19 +19,20 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import javax.security.auth.login.Configuration; -import javax.ws.rs.core.Configurable; - import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import javax.security.auth.login.Configuration; +import javax.ws.rs.core.Configurable; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java index 44d6ed8e33d44..24ecadcc0a09b 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java @@ -17,16 +17,10 @@ package org.apache.kafka.connect.rest.basic.auth.extension; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.ChoiceCallback; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.SecurityContext; -import javax.ws.rs.core.UriInfo; - import org.apache.kafka.common.security.authenticator.TestJaasConfig; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.test.TestUtils; + import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -42,8 +36,14 @@ import java.util.List; import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.ChoiceCallback; +import javax.ws.rs.HttpMethod; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; +import javax.ws.rs.core.UriInfo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index 68ee27cb93972..a288ab18ae7e3 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java index cb19c01e6007f..0b5f112ce3083 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index 37cdcec1b053e..f0f07a62a2f9b 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java index cda58cf2d08b5..79478c57d1fa4 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java @@ -16,6 +16,15 @@ */ package org.apache.kafka.connect.file; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -29,14 +38,6 @@ import java.util.List; import java.util.Map; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.source.SourceTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * FileStreamSourceTask reads from stdin or a file. */ diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java index 0f1ab8e6e11ef..787f1fb4c9351 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java @@ -19,17 +19,18 @@ import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.sink.SinkConnector; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.Mockito.mock; - import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; + public class FileStreamSinkConnectorTest { private static final String MULTIPLE_TOPICS = "test1,test2"; diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java index 23a28d8527c18..dde20105e3731 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; public class FileStreamSinkTaskTest { diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java index 41915913b03e3..e0c14a1e6cb19 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; import org.apache.kafka.connect.source.ExactlyOnceSupport; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java index 9f65dd95ecf07..e0e77a8433c72 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java index 433c2004710c2..f6a1719a97f89 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.file.FileStreamSinkConnector; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.test.TestUtils; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java index 698f4fcf8d366..577b07bb5bdb4 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.test.TestUtils; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index b623aaf14e8e2..c4ec44ca157b6 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -16,11 +16,6 @@ */ package org.apache.kafka.connect.json; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; @@ -29,22 +24,28 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; -import org.apache.kafka.connect.data.Decimal; -import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StringConverterConfig; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java index add8bec5b33c0..2718c45497823 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.connect.json; -import java.util.Locale; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.connect.storage.ConverterConfig; +import java.util.Locale; import java.util.Map; /** diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java index d63ae7808c303..16c8fb26307f2 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java @@ -16,13 +16,14 @@ */ package org.apache.kafka.connect.json; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; import java.util.Collections; import java.util.Set; diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java index f8b13fbb6a974..87b6980c29f65 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java @@ -16,12 +16,13 @@ */ package org.apache.kafka.connect.json; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; import java.util.Collections; import java.util.Set; diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterConfigTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterConfigTest.java index efa1f60fb4a1a..930fb3bb4b84e 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterConfigTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterConfigTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; + import org.junit.jupiter.api.Test; import java.util.HashMap; diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index e6a356e761347..d79c8527b3c21 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -16,11 +16,6 @@ */ package org.apache.kafka.connect.json; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Date; @@ -32,6 +27,13 @@ import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java index 353d2eedb9592..603f09df84cad 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java @@ -16,17 +16,17 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import java.util.Map; -import java.util.HashMap; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; /** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */ @@ -195,4 +195,3 @@ public int hashCode() { return Objects.hash(consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata); } } - diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index 65a98947d391b..fa2c5a75b24b9 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -18,12 +18,12 @@ import org.apache.kafka.common.Configurable; -import java.util.Map; -import java.util.regex.Pattern; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.regex.Pattern; + /** Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period. */ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable { diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java index f25755dee9746..ab88e60439ae5 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java @@ -16,15 +16,15 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import java.util.Map; -import java.util.HashMap; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; /** Heartbeat message sent from MirrorHeartbeatTask to target cluster. Heartbeats are always replicated. */ public class Heartbeat { @@ -142,4 +142,3 @@ private static Schema valueSchema(short version) { return VALUE_SCHEMA_V0; } } - diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java index 67ef7778f230a..16a3dfa11ff02 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.connect.mirror; -import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating * from legacy MM1, or for any use-case involving one-way replication. *

diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java index b40b6ba552f32..759f8aa85d99a 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java @@ -19,28 +19,28 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Set; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.HashMap; -import java.util.Collections; -import java.util.Collection; -import java.util.stream.Collectors; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; /** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster. *

diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index 053e594fbeb1d..b21b7f7246341 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -16,18 +16,18 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.ForwardingAdmin; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Utils; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java index e5299eb41ee06..f67a81e355956 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java @@ -17,13 +17,13 @@ package org.apache.kafka.connect.mirror; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; -import java.time.Duration; /** Convenience methods for multi-cluster environments. Wraps {@link MirrorClient} diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java index 199141d4a5826..f9793aceed90f 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java @@ -49,4 +49,3 @@ public boolean equals(Object other) { return other != null && toString().equals(other.toString()); } } - diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java index f9f20ffead324..be728a0ebe98a 100644 --- a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java +++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java @@ -18,17 +18,17 @@ import org.apache.kafka.common.Configurable; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.HashSet; -import java.util.Arrays; - -import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MirrorClientTest { diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java index ef286680793e7..802d0b606c234 100644 --- a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java +++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java @@ -25,8 +25,8 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ReplicationPolicyTest { private static final DefaultReplicationPolicy DEFAULT_REPLICATION_POLICY = new DefaultReplicationPolicy(); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java index cbe76efecb38c..2e88977d93cea 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java index 9183ebcf8808b..bb4ace9a6a93c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.annotation.InterfaceStability; + import java.util.Map; /** Defines which topic configuration properties should be replicated. */ diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java index 322bebbe1519f..a464e8f935228 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java @@ -18,8 +18,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.ConfigUtils; import java.util.Map; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java index e3eab88121de7..ccf0d3790f72a 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java @@ -18,8 +18,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.ConfigUtils; import java.util.Map; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java index 2b4637c272165..2fa4e193776ac 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java @@ -18,8 +18,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.ConfigUtils; import java.util.Map; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java index 0202dd5d2b358..d54274523a001 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.annotation.InterfaceStability; + import java.util.Map; /** Defines which consumer groups should be replicated. */ diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index cb9158fd8abe6..04fe29bf6954b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.util.ConnectorUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java index 8885dcd82cc4e..8ace7d1fc3bdb 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java @@ -17,21 +17,21 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Value; -import org.apache.kafka.common.metrics.stats.Min; -import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Value; import java.util.Arrays; -import java.util.Set; -import java.util.HashSet; -import java.util.Map; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; /** Metrics for replicated topic-partitions */ class MirrorCheckpointMetrics implements AutoCloseable { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 7f446efea5df3..e3bd4e41b1b44 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -18,33 +18,33 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.errors.UnknownMemberIdException; -import org.apache.kafka.connect.source.SourceTask; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.Map.Entry; -import java.util.Map; import java.util.List; -import java.util.ArrayList; +import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; -import java.util.Collections; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.concurrent.ExecutionException; -import java.time.Duration; import java.util.stream.Stream; import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java index 757651383c9ef..a8db4989b297c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java @@ -18,11 +18,11 @@ import org.apache.kafka.common.config.ConfigDef; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.List; -import java.util.HashSet; -import java.util.Collections; public class MirrorCheckpointTaskConfig extends MirrorCheckpointConfig { @@ -64,4 +64,3 @@ String entityLabel() { ConfigDef.Importance.LOW, "The index of the task"); } - diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java index 731ef2015c453..aa626cfe0a263 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java @@ -16,25 +16,26 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.ForwardingAdmin; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.metrics.KafkaMetricsContext; -import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.MetricsContext; -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.ConnectorConfig; -import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; -import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in; -import java.util.Map; +import java.time.Duration; import java.util.HashMap; import java.util.List; -import java.time.Duration; +import java.util.Map; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in; /** Shared config properties used by {@link MirrorSourceConnector}, {@link MirrorCheckpointConnector}, and {@link MirrorHeartbeatConnector}. *

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java index 6ab9fce31bee0..f9a844fecfa13 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java @@ -17,16 +17,16 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceConnector; -import java.util.Map; -import java.util.List; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY; import static org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java index 6533c5162cb60..35c9c8feccb29 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java @@ -17,16 +17,16 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.connect.source.SourceTask; -import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; -import java.util.Map; -import java.util.List; +import java.time.Duration; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.time.Duration; /** Emits heartbeats. */ public class MirrorHeartbeatTask extends SourceTask { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java index 36bcdfac9029f..f92b666e3e6aa 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 785edc44a165c..3bc7aed02b36e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -16,57 +16,57 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.mirror.rest.MirrorRestServer; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; -import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; -import org.apache.kafka.connect.storage.StatusBackingStore; -import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.ConfigBackingStore; -import org.apache.kafka.connect.storage.KafkaConfigBackingStore; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.KafkaConfigBackingStore; +import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; +import org.apache.kafka.connect.storage.KafkaStatusBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; -import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; -import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; - import org.apache.kafka.connect.util.SharedTopicAdmin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.impl.Arguments; -import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; -import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.Namespace; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.Map; import java.util.HashMap; -import java.util.Set; import java.util.HashSet; import java.util.List; -import java.util.Arrays; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import java.io.File; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index fd672f56a6ce8..8f9f06f058eda 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -16,30 +16,29 @@ */ package org.apache.kafka.connect.mirror; -import java.util.Arrays; -import java.util.Map.Entry; - -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigTransformer; -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestServerConfig; -import java.util.Map; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.HashSet; -import java.util.ArrayList; -import java.util.Collections; import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index 19c84e53cc008..16fa033c370e6 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -16,62 +16,61 @@ */ package org.apache.kafka.connect.mirror; -import java.util.HashMap; -import java.util.Map.Entry; - -import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.IsolationLevel; -import org.apache.kafka.common.config.ConfigValue; -import org.apache.kafka.common.errors.SecurityDisabledException; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.ExactlyOnceSupport; -import org.apache.kafka.connect.source.SourceConnector; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntryFilter; -import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.resource.ResourceType; -import org.apache.kafka.common.resource.ResourcePattern; -import org.apache.kafka.common.resource.ResourcePatternFilter; -import org.apache.kafka.common.resource.PatternType; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.NewPartitions; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.ExactlyOnceSupport; +import org.apache.kafka.connect.source.SourceConnector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.List; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.HashSet; -import java.util.Collection; -import java.util.Collections; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import java.util.concurrent.ExecutionException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED; import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index e339cc1b6b05e..a875300ad717b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -16,36 +16,36 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.source.SourceTask; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.header.Headers; -import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.Map; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.ArrayList; -import java.util.stream.Collectors; import java.util.concurrent.Semaphore; -import java.time.Duration; +import java.util.stream.Collectors; /** Replicates a set of topic-partitions. */ public class MirrorSourceTask extends SourceTask { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java index 3a2eacd1a9eb7..f0c562bbcbb08 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigDef; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.List; -import java.util.Collections; import java.util.stream.Collectors; public class MirrorSourceTaskConfig extends MirrorSourceConfig { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java index dd52346f21a69..d8cbba184a48c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java @@ -19,8 +19,9 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidConfigurationException; @@ -30,18 +31,18 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.util.TopicAdmin; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.Map; -import java.util.List; -import java.util.HashMap; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java index e1ecb1e1dbad0..c46aac634fba5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.nio.ByteBuffer; @@ -117,4 +117,3 @@ byte[] recordValue() { return serializeValue().array(); } } - diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 16038044ddd2b..635ab7327735e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index 642252a3e2536..9a39242d40bdf 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -16,15 +16,15 @@ */ package org.apache.kafka.connect.mirror; -import java.util.concurrent.ScheduledExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.time.Duration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class Scheduler implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); @@ -112,4 +112,3 @@ private void executeThread(Task task, String description) { run(task, description); } } - diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java index f13453f116850..902b09287800e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.annotation.InterfaceStability; + import java.util.Map; /** Defines which topics should be replicated. */ diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java index 33fe695874240..8d2285a7832bb 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.connect.mirror.formatters; -import java.io.PrintStream; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MessageFormatter; import org.apache.kafka.connect.mirror.Checkpoint; +import java.io.PrintStream; + public class CheckpointFormatter implements MessageFormatter { @Override diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java index a193dbe153099..b7d4be3a35545 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.connect.mirror.formatters; -import java.io.PrintStream; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MessageFormatter; import org.apache.kafka.connect.mirror.Heartbeat; +import java.io.PrintStream; + public class HeartbeatFormatter implements MessageFormatter { @Override diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java index dacae600ae53d..cc5466bda965c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.connect.mirror.formatters; -import java.io.PrintStream; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MessageFormatter; import org.apache.kafka.connect.mirror.OffsetSync; +import java.io.PrintStream; + public class OffsetSyncFormatter implements MessageFormatter { @Override diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java index 7d24a5f14db9c..f09cb12b0f060 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServerConfig; + import org.glassfish.hk2.api.TypeLiteral; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.server.ResourceConfig; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 752fb0ffce4bd..06480bcf4a5a4 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -22,12 +22,13 @@ import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import java.util.Map; + import javax.inject.Inject; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.core.Context; import javax.ws.rs.core.UriInfo; -import java.util.Map; @Path("/{source}/{target}/connectors") public class InternalMirrorResource extends InternalClusterResource { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java index b7b3904899f1a..476fbcceaef82 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.connect.util.Callback; + import org.junit.jupiter.api.Test; import java.util.Collections; @@ -28,9 +29,9 @@ import java.util.Map; import java.util.Set; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java index 495fd2ebe1594..edbaf0955704d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.common.config.ConfigDef; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java index ecc9fcbc11f6a..7e5b6d83587ee 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index 0afc4f74f2f97..f4cc1e4ced6a4 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -16,26 +16,25 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.source.SourceRecord; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Collections; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.ExecutionException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.connect.source.SourceRecord; - -import org.junit.jupiter.api.Test; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java index 3840c49114f15..817d1684b16e2 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java @@ -21,17 +21,18 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.test.MockMetricsReporter; + import org.junit.jupiter.api.Test; +import java.util.HashMap; import java.util.Locale; import java.util.Map; -import java.util.HashMap; import static org.apache.kafka.connect.mirror.TestUtils.makeProps; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MirrorConnectorConfigTest { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java index 0248e487c18e7..190f749d4e71b 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java @@ -16,12 +16,9 @@ */ package org.apache.kafka.connect.mirror; -import static org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY; -import static org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY; -import static org.apache.kafka.connect.mirror.TestUtils.makeProps; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.kafka.connect.errors.ConnectException; + +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -30,8 +27,12 @@ import java.util.Map; import java.util.function.Function; -import org.apache.kafka.connect.errors.ConnectException; -import org.junit.jupiter.api.Test; +import static org.apache.kafka.connect.mirror.Heartbeat.SOURCE_CLUSTER_ALIAS_KEY; +import static org.apache.kafka.connect.mirror.Heartbeat.TARGET_CLUSTER_ALIAS_KEY; +import static org.apache.kafka.connect.mirror.TestUtils.makeProps; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MirrorHeartBeatConnectorTest { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java index 39fd6dff10e30..0ffe2635d1491 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.Test; import java.util.List; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java index fe6011d203368..163ebdd4b7b91 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java @@ -19,22 +19,22 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.FakeForwardingAdmin; import org.apache.kafka.clients.admin.ForwardingAdmin; +import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metrics.FakeMetricsReporter; - import org.apache.kafka.common.security.auth.SecurityProtocol; + import org.junit.jupiter.api.Test; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.Arrays; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java index b703731073761..e4d6082fa7fca 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 3aeb3837b8717..5d7a3a020f57f 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -16,10 +16,13 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.DescribeAclsResult; import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -28,22 +31,33 @@ import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; -import org.apache.kafka.clients.admin.Config; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.connect.connector.ConnectorContext; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.NewTopic; - import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.ExactlyOnceSupport; + import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + import static org.apache.kafka.clients.admin.AdminClientTestUtils.alterConfigsResult; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX; @@ -54,13 +68,13 @@ import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY; import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY; import static org.apache.kafka.connect.mirror.TestUtils.makeProps; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -75,20 +89,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; -import java.util.stream.Collectors; - public class MirrorSourceConnectorTest { private ConfigPropertyFilter getConfigPropertyFilter() { return prop -> true; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java index ae43c72dcb066..1aafdf1626417 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.connect.runtime.ConnectorConfig; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 5a5046fd0f6dc..8e513ffd82bc0 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -22,17 +22,17 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeader; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; - import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; + import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -54,12 +54,11 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; - import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verifyNoInteractions; public class MirrorSourceTaskTest { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java index 99cdb610759f6..e6de8a58f7b26 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.connect.errors.ConnectException; + import org.junit.jupiter.api.Test; import java.util.Collections; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java index 3ac8a8b17f00d..1f2f56166a23f 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TopicExistsException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java index 99706d4eaeaac..1c08cbaf72ef9 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.acl.AclBinding; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index 07bbafd751622..6e35e962ed92a 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.test.NoRetryException; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java index 8dc04e6074701..0a6ab4bab158c 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java @@ -18,11 +18,10 @@ import org.apache.kafka.connect.mirror.IdentityReplicationPolicy; -import java.util.HashMap; - +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.BeforeEach; +import java.util.HashMap; /** * Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}. diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 4fbd282d11ca4..3f169b46920a8 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -31,23 +31,23 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.Checkpoint; import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; import org.apache.kafka.connect.mirror.MirrorClient; import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; import org.apache.kafka.connect.mirror.MirrorMakerConfig; import org.apache.kafka.connect.mirror.MirrorSourceConnector; import org.apache.kafka.connect.mirror.MirrorUtils; import org.apache.kafka.connect.mirror.SourceAndTarget; -import org.apache.kafka.connect.mirror.Checkpoint; -import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; import org.apache.kafka.connect.mirror.TestUtils; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; @@ -55,14 +55,21 @@ import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; -import java.util.List; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -77,20 +84,13 @@ import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; -import org.junit.jupiter.api.Tag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests MM2 replication and failover/failback logic. diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java index 36e7b34dd54d1..e02cc4c02b332 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.mirror.MirrorSourceConnector; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java index 95f8a704085c3..18f734a00a2ac 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java @@ -16,10 +16,6 @@ */ package org.apache.kafka.connect.mirror.integration; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; @@ -32,6 +28,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + /** * Tests MM2 replication with SSL enabled at backup kafka cluster */ @@ -64,4 +64,3 @@ public void startClusters() throws Exception { super.startClusters(); } } - diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java index c192d420375ce..87fc48fd3f3d2 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; + import org.junit.jupiter.api.BeforeEach; import java.util.HashMap; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java index 70f1cd6f6a343..d94ce632ae10c 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java index 7bd277ac434d9..267466a4b0b52 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java index 77f5192892fea..392c58fb01f34 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java index 601f8b26f8415..c09fa6a53194e 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java index b73d93ea6ee44..5849dd27ca51f 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.server.util.ThroughputThrottler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java index ff71ff8132db2..303e2a2ee0011 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java @@ -16,14 +16,15 @@ */ package org.apache.kafka.connect.tools; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java index bf90500d3237f..49151b40d1ebb 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java @@ -16,16 +16,18 @@ */ package org.apache.kafka.connect.tools; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.server.util.ThroughputThrottler; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.server.util.ThroughputThrottler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java index 31ac77cd210c3..2e00e34ebdd53 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.util.SchemaUtil; import org.apache.kafka.connect.transforms.util.SimpleConfig; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java index 1b3a77799df7f..ebfded643dcfd 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java @@ -23,8 +23,8 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.transforms.field.SingleFieldPath; import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion; +import org.apache.kafka.connect.transforms.field.SingleFieldPath; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Map; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java index e54531e645a7c..80fdcbf7e9c30 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.connect.transforms; -import java.util.Map; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; +import java.util.Map; + /** * Drops all records, filtering them from subsequent transformations in the chain. * This is intended to be used conditionally to filter out records matching (or not matching) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index d31bbe010a67e..ddc29c2e2c16f 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -29,8 +29,8 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; -import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Date; import java.util.HashMap; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index a88454d0af099..abca5917d3f72 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.transforms; -import java.util.ArrayList; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; @@ -31,6 +30,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java index d72234f263973..ae1700efdcc30 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.util.RegexValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java index 280fb5833f6a3..84505d59397f6 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.util.SimpleConfig; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java index 6229bb7b72908..fe0a99c4abe72 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.connect.transforms.predicates; -import java.util.Iterator; -import java.util.Map; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; @@ -26,6 +23,9 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import java.util.Iterator; +import java.util.Map; + /** * A predicate which is true for records with at least one header with the configured name. * @param The type of connect record. diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java index dc1f602b4a384..848a8452b2879 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.connect.transforms.predicates; -import java.util.Map; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; +import java.util.Map; + /** * A predicate which is true for records which are tombstones (i.e. have null value). * @param The type of connect record. diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java index 7e78a69df0010..bcd519eba5a67 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java @@ -16,10 +16,6 @@ */ package org.apache.kafka.connect.transforms.predicates; -import java.util.Map; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.AppInfoParser; @@ -28,6 +24,10 @@ import org.apache.kafka.connect.transforms.util.RegexValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + /** * A predicate which is true for records with a topic name that matches the configured regular expression. * @param The type of connect record. diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java index d6a6a7fb4f9ad..a70fac1d1db6f 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java index 95649bd2b3d04..d164512897b64 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -119,4 +120,3 @@ private SourceRecord sourceRecord(ConnectHeaders headers) { keySchema, key, valueSchema, value, timestamp, headers); } } - diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java index 9b7d53085de51..2ac2f91829bf0 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java @@ -16,17 +16,18 @@ */ package org.apache.kafka.connect.transforms; -import java.util.HashMap; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index c418ba427c91e..f771d4f0ac3e4 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java index 61e05757474b6..91efd3fb3e195 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -356,4 +357,3 @@ private static void assertSameRecord(SourceRecord expected, SourceRecord xformed } } - diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java index 63f416f99f392..b72dddcdd155c 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java index 727f9d0b21719..f9754aa5a52b2 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java index 237662701f4a4..20c5b67a50a93 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -123,4 +124,3 @@ public void testInsertHeaderVersionRetrievedFromAppInfoParser() { assertEquals(AppInfoParser.getVersion(), xform.version()); } } - diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java index 2f5d7feaf2188..1f5a9641d2854 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.Test; import java.math.BigDecimal; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java index 5ba2367545b00..6ec7601bb625c 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.Test; import java.util.HashMap; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java index 9df4013c9a4dd..53566cada7cd5 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java index 8c0f45ce8658c..46f8be778876a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java index 8b5611e9fbcd5..fb3f3be62650a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -37,10 +38,10 @@ import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public class TimestampConverterTest { private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java index ec1e3bbe0547c..43b3b1f384ff5 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.source.SourceRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java index 1ffebce6e3bde..a7d032009e513 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java index d823bb4524435..504c41aa45c44 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.transforms.field; import org.apache.kafka.common.config.ConfigException; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertArrayEquals; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java index a9356b5232cc2..d400141c95b71 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java index f232bdbb0c867..a7292d2060923 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java @@ -16,17 +16,18 @@ */ package org.apache.kafka.connect.transforms.field; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; - import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; + import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + class SingleFieldPathTest { @Test void shouldFindField() { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java index d21c98f98c2ec..e3e3920858d27 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.util.SimpleConfig; + import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java index 0640803e6e648..3d9ac4dba9048 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.util.SimpleConfig; + import org.junit.jupiter.api.Test; import java.util.Collections; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java index ff3c4f720fd55..5060346a2d91b 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.transforms.util; import org.apache.kafka.common.config.ConfigException; + import org.junit.jupiter.api.Test; import java.util.Collections;