diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java index 8ee30017e0c..a470ee1b116 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java @@ -17,10 +17,10 @@ package io.confluent.kafka.serializers; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import kafka.serializer.Decoder; import kafka.utils.VerifiableProperties; import org.apache.avro.Schema; +import org.apache.kafka.tools.api.Decoder; public class KafkaAvroDecoder extends AbstractKafkaAvroDeserializer implements Decoder { diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java index 37ea7340f8c..7da41b1f4cf 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java @@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler.ValidationStatus; import io.confluent.kafka.schemaregistry.utils.ShutdownableThread; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -194,7 +195,7 @@ public Map checkpoints() { @Override public void doWork() { try { - ConsumerRecords records = consumer.poll(Long.MAX_VALUE); + ConsumerRecords records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); storeUpdateHandler.startBatch(records.count()); for (ConsumerRecord record : records) { K messageKey = null; diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/SASLClusterTestHarness.java b/core/src/test/java/io/confluent/kafka/schemaregistry/SASLClusterTestHarness.java index 814806509d1..34bb2cc5ce1 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/SASLClusterTestHarness.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/SASLClusterTestHarness.java @@ -97,7 +97,7 @@ public void setUp() throws Exception { private void createPrincipal(File keytab, String principalNoRealm) throws Exception { Seq principals = JavaConverters.asScalaBuffer( - Arrays.asList(principalNoRealm) + Arrays.asList(principalNoRealm) ).toList(); kdc.createPrincipal(keytab, principals); } diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java index 9f68187e8bf..a8bab10a1e6 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java @@ -15,6 +15,7 @@ package io.confluent.kafka.schemaregistry.client; import io.confluent.kafka.serializers.context.strategy.ContextNameStrategy; +import java.time.Duration; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -84,7 +85,7 @@ private ArrayList consume(Consumer consumer, String topi int i = 0; while (i < numMessages) { - ConsumerRecords records = consumer.poll(1000); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { recordList.add(record.value()); i++; diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java index 9f98f660b04..c5d9acb1884 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java @@ -19,6 +19,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -81,7 +82,7 @@ private ArrayList consume(Consumer consumer, String topi int i = 0; while (i < numMessages) { - ConsumerRecords records = consumer.poll(1000); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { recordList.add(record.value()); i++; diff --git a/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDecoder.java b/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDecoder.java index 1d9ae376e17..d84f635a586 100644 --- a/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDecoder.java +++ b/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDecoder.java @@ -16,9 +16,10 @@ package io.confluent.kafka.serializers; -import kafka.serializer.Decoder; import kafka.utils.VerifiableProperties; +import org.apache.kafka.tools.api.Decoder; + /** * Decode JSON data to an Object. */ diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorIntegrationTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorIntegrationTest.java index eea37e13aae..19a8c0ed5b2 100644 --- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorIntegrationTest.java +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutorIntegrationTest.java @@ -42,6 +42,7 @@ import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.WrapperKeyDeserializer; import io.confluent.kafka.serializers.WrapperKeySerializer; +import java.time.Duration; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; @@ -166,7 +167,7 @@ private static List> consume(Consumer int i = 0; do { - ConsumerRecords records = consumer.poll(1000); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { recordList.add(new SimpleEntry<>(record.key(), record.value())); i++; diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorIntegrationTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorIntegrationTest.java index e3c4288ee53..88ccd07dd36 100644 --- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorIntegrationTest.java +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorIntegrationTest.java @@ -66,6 +66,7 @@ import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; +import java.time.Duration; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; @@ -159,7 +160,7 @@ private static List> consume(Consumer int i = 0; do { - ConsumerRecords records = consumer.poll(1000); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { recordList.add(new SimpleEntry<>(record.key(), record.value())); i++;