diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 96d8cc1c64..7ed03e1d58 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -35,21 +35,16 @@ dependencies { implementation 'io.micrometer:micrometer-core' implementation libs.commons.lang3 implementation 'io.confluent:kafka-avro-serializer:7.4.0' + implementation 'io.confluent:kafka-json-schema-serializer:7.4.0' implementation 'io.confluent:kafka-schema-registry-client:7.4.0' - implementation ('io.confluent:kafka-schema-registry:7.4.0:tests') { - exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' - exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2' - exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation' - } implementation 'software.amazon.awssdk:sts' implementation 'software.amazon.awssdk:auth' implementation 'software.amazon.awssdk:kafka' implementation 'software.amazon.awssdk:kms' implementation 'software.amazon.msk:aws-msk-iam-auth:2.0.3' implementation 'software.amazon.glue:schema-registry-serde:1.1.15' - implementation 'io.confluent:kafka-json-schema-serializer:7.4.0' implementation project(':data-prepper-plugins:failures-common') - implementation 'com.github.fge:json-schema-validator:2.2.14' + implementation 'com.github.java-json-tools:json-schema-validator:2.2.14' implementation 'commons-collections:commons-collections:3.2.2' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:apache-client' @@ -65,7 +60,6 @@ dependencies { testImplementation 'org.apache.kafka:kafka_2.13:3.6.1' testImplementation 'org.apache.kafka:kafka_2.13:3.6.1:test' testImplementation 'org.apache.curator:curator-test:5.5.0' - testImplementation 'io.confluent:kafka-schema-registry:7.4.0' testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39') testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9' testImplementation project(':data-prepper-plugins:otel-metrics-source') @@ -74,8 +68,15 @@ dependencies { testImplementation libs.protobuf.util testImplementation libs.commons.io testImplementation libs.armeria.grpc + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' integrationTestImplementation testLibs.junit.vintage + integrationTestImplementation 'io.confluent:kafka-schema-registry:7.4.0' + integrationTestImplementation ('io.confluent:kafka-schema-registry:7.4.0:tests') { + exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' + exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2' + exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation' + } constraints { implementation('org.mozilla:rhino') { @@ -130,4 +131,3 @@ task integrationTest(type: Test) { includeTestsMatching '*IT' } } - diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index fab0a4f56e..961e0328d3 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -10,7 +10,6 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaJsonDeserializer; -import kafka.common.BrokerEndPointNotAvailableException; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -108,8 +107,7 @@ public List createConsumersForTopic(final KafkaConsumerConf }); } catch (Exception e) { - if (e instanceof BrokerNotAvailableException || - e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) { + if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { LOG.error("The Kafka broker is not available."); } else { LOG.error("Failed to setup the Kafka Source Plugin.", e); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index ec27f1f370..fbdee41105 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -11,7 +11,6 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; -import kafka.common.BrokerEndPointNotAvailableException; import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -146,8 +145,7 @@ public void start(Buffer> buffer) { executorService.submit(consumer); }); } catch (Exception e) { - if (e instanceof BrokerNotAvailableException || - e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) { + if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { LOG.error("The kafka broker is not available..."); } else { LOG.error("Failed to setup the Kafka Source Plugin.", e);