From 40427542c370d9044821d725e20bd84adda23140 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Sat, 1 Jun 2019 10:29:14 +0200 Subject: [PATCH 1/6] #2663 Kafka Streams extension --- bom/pom.xml | 10 +++ .../builditem/FeatureBuildItem.java | 1 + extensions/kafka-streams/deployment/pom.xml | 72 +++++++++++++++++++ .../deployment/KafkaStreamsProcessor.java | 72 +++++++++++++++++++ extensions/kafka-streams/pom.xml | 37 ++++++++++ extensions/kafka-streams/runtime/pom.xml | 71 ++++++++++++++++++ .../streams/runtime/KafkaStreamsTemplate.java | 28 ++++++++ .../graal/KafkaStreamsSubstitutions.java | 36 ++++++++++ extensions/pom.xml | 1 + 9 files changed, 328 insertions(+) create mode 100644 extensions/kafka-streams/deployment/pom.xml create mode 100644 extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java create mode 100644 extensions/kafka-streams/pom.xml create mode 100644 extensions/kafka-streams/runtime/pom.xml create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java diff --git a/bom/pom.xml b/bom/pom.xml index 7140ff0758369..cdcb3ef9938e6 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -347,6 +347,11 @@ quarkus-kafka-client ${project.version} + + io.quarkus + quarkus-kafka-streams + ${project.version} + io.quarkus quarkus-smallrye-health @@ -1048,6 +1053,11 @@ kafka-clients ${kafka-clients.version} + + org.apache.kafka + kafka-streams + ${kafka-clients.version} + org.apache.kafka kafka_2.12 diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java index 0c14c419a3854..16a5306118ac4 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java @@ -29,6 +29,7 @@ public final class FeatureBuildItem extends MultiBuildItem { public static final String JDBC_MARIADB = "jdbc-mariadb"; public static final String JDBC_POSTGRESQL = "jdbc-postgresql"; public static final String JDBC_MSSQL = "jdbc-mssql"; + public static final String KAFKA_STREAMS = "kafka-streams"; public static final String KEYCLOAK = "keycloak"; public static final String KOTLIN = "kotlin"; public static final String MAILER = "mailer"; diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml new file mode 100644 index 0000000000000..e13d762d978eb --- /dev/null +++ b/extensions/kafka-streams/deployment/pom.xml @@ -0,0 +1,72 @@ + + + + + + quarkus-kafka-streams-parent + io.quarkus + 999-SNAPSHOT + ../ + + 4.0.0 + + quarkus-kafka-streams-deployment + Quarkus - Kafka Streams - Deployment + + + + io.quarkus + quarkus-core-deployment + + + io.quarkus + quarkus-kafka-streams + + + + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java new file mode 100644 index 0000000000000..0bde6d29b1630 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java @@ -0,0 +1,72 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.quarkus.kafka.streams.deployment; + +import java.io.IOException; + +import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.processor.DefaultPartitionGrouper; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; +import org.rocksdb.util.Environment; + +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem; +import io.quarkus.deployment.builditem.substrate.RuntimeReinitializedClassBuildItem; +import io.quarkus.deployment.builditem.substrate.SubstrateResourceBuildItem; +import io.quarkus.deployment.recording.RecorderContext; +import io.quarkus.kafka.streams.runtime.KafkaStreamsTemplate; + +class KafkaStreamsProcessor { + + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + void build(RecorderContext recorder, + BuildProducer feature, + BuildProducer reflectiveClasses, + BuildProducer reinitialized, + BuildProducer nativeLibs) throws IOException { + + feature.produce(new FeatureBuildItem(FeatureBuildItem.KAFKA_STREAMS)); + + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, StreamsPartitionAssignor.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultPartitionGrouper.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultProductionExceptionHandler.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, LogAndFailExceptionHandler.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, ByteArraySerde.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, FailOnInvalidTimestamp.class)); + + nativeLibs.produce(new SubstrateResourceBuildItem(Environment.getJniLibraryFileName("rocksdb"))); + + // re-initializing RocksDB to enable load of native libs + reinitialized.produce(new RuntimeReinitializedClassBuildItem("org.rocksdb.RocksDB")); + } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + void build(KafkaStreamsTemplate template) { + // Explicitly loading RocksDB native libs, as that's normally done from within + // static initializers which already ran during build + template.loadRocksDb(); + } +} diff --git a/extensions/kafka-streams/pom.xml b/extensions/kafka-streams/pom.xml new file mode 100644 index 0000000000000..cb6bbfa5ab669 --- /dev/null +++ b/extensions/kafka-streams/pom.xml @@ -0,0 +1,37 @@ + + + + + + quarkus-build-parent + io.quarkus + 999-SNAPSHOT + ../../build-parent/pom.xml + + 4.0.0 + + quarkus-kafka-streams-parent + Quarkus - Kafka Streams + + pom + + deployment + runtime + + diff --git a/extensions/kafka-streams/runtime/pom.xml b/extensions/kafka-streams/runtime/pom.xml new file mode 100644 index 0000000000000..5a576783aec98 --- /dev/null +++ b/extensions/kafka-streams/runtime/pom.xml @@ -0,0 +1,71 @@ + + + + + + quarkus-kafka-streams-parent + io.quarkus + 999-SNAPSHOT + ../ + + 4.0.0 + + quarkus-kafka-streams + Quarkus - Kafka Streams - Runtime + + + + io.quarkus + quarkus-core + + + io.quarkus + quarkus-kafka-client + + + org.apache.kafka + kafka-streams + + + com.oracle.substratevm + svm + + + + + + + io.quarkus + quarkus-bootstrap-maven-plugin + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java new file mode 100644 index 0000000000000..06332490cb4f4 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.quarkus.kafka.streams.runtime; + +import org.rocksdb.RocksDB; + +import io.quarkus.runtime.annotations.Template; + +@Template +public class KafkaStreamsTemplate { + + public void loadRocksDb() { + RocksDB.loadLibrary(); + } +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java new file mode 100644 index 0000000000000..007357b8f6cab --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.quarkus.kafka.streams.runtime.graal; + +import com.oracle.svm.core.annotate.Alias; +import com.oracle.svm.core.annotate.RecomputeFieldValue; +import com.oracle.svm.core.annotate.TargetClass; + +/** + * Resets the {@code initialized} field, so that the native libs are loaded again at + * image runtime, after they have been loaded once at build time via calls from static + * initializers. + */ +@TargetClass(className = "org.rocksdb.NativeLibraryLoader") +final class Target_org_rocksdb_NativeLibraryLoader { + + @Alias + @RecomputeFieldValue(kind = RecomputeFieldValue.Kind.Reset) + private static boolean initialized = false; +} + +public final class KafkaStreamsSubstitutions { +} diff --git a/extensions/pom.xml b/extensions/pom.xml index 28de77bceba89..8b42fa37d2ec7 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -81,6 +81,7 @@ hibernate-search-elasticsearch elasticsearch-rest-client kafka-client + kafka-streams spring-di From b91841b4e9f39baab5f686b0cfe77b820a9b05d0 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Sun, 2 Jun 2019 09:54:27 +0200 Subject: [PATCH 2/6] #2663 Adding integration test for Kafka Streams --- integration-tests/kafka/pom.xml | 9 ++ .../io/quarkus/it/kafka/JsonObjectSerde.java | 84 +++++++++++++ .../it/kafka/KafkaStreamsPipeline.java | 112 +++++++++++++++++ .../quarkus/it/main/KafkaStreamsITCase.java | 8 ++ .../io/quarkus/it/main/KafkaStreamsTest.java | 115 ++++++++++++++++++ 5 files changed, 328 insertions(+) create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java create mode 100644 integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java create mode 100644 integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml index 319bc05d480f2..011d8f6e5b0c4 100644 --- a/integration-tests/kafka/pom.xml +++ b/integration-tests/kafka/pom.xml @@ -49,12 +49,20 @@ io.quarkus quarkus-resteasy + + io.quarkus + quarkus-resteasy-jsonb + io.quarkus quarkus-kafka-client + + io.quarkus + quarkus-kafka-streams + @@ -140,6 +148,7 @@ true true + true diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java index a67ebe9c84971..83c156158c111 100644 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java @@ -1,11 +1,16 @@ package io.quarkus.it.kafka; +import static org.awaitility.Awaitility.await; + +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; @@ -41,11 +46,13 @@ import org.apache.kafka.streams.state.Stores; import io.quarkus.runtime.StartupEvent; - @ApplicationScoped @Path("/kafkastreams") public class KafkaStreamsPipeline { + private static final String CATEGORIES_TOPIC_NAME = "streams-test-categories"; + private static final String CUSTOMERS_TOPIC_NAME = "streams-test-customers"; + private KafkaStreams streams; private ExecutorService executor; @@ -62,11 +69,11 @@ void onStart(@Observes StartupEvent ev) { JsonObjectSerde jsonNodeSerde = new JsonObjectSerde(); KTable categories = builder.table( - "streams-test-categories", + CATEGORIES_TOPIC_NAME, Consumed.with(Serdes.Integer(), jsonNodeSerde)); KStream customers = builder - .stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde)) + .stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), jsonNodeSerde)) .selectKey((k, v) -> v.getJsonNumber("category").intValue()) .join( categories, @@ -123,24 +130,20 @@ private void waitForTopicsToBeCreated(String bootstrapServers) { config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient adminClient = AdminClient.create(config)) { - AtomicBoolean topicsCreated = new AtomicBoolean(false); + await().until(topicsCreated(adminClient, CATEGORIES_TOPIC_NAME, CUSTOMERS_TOPIC_NAME)); + } + } - while (topicsCreated.get() == false) { + private Callable topicsCreated(AdminClient adminClient, String... expectedTopics) { + return new Callable() { + + @Override + public Boolean call() throws Exception { ListTopicsResult topics = adminClient.listTopics(); - topics.names().whenComplete((t, e) -> { - if (e != null) { - throw new RuntimeException(e); - } else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) { - topicsCreated.set(true); - } - }); - - try { - Thread.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + Set topicNames = topics.names().get(10, TimeUnit.SECONDS); + + return topicNames.containsAll(Arrays.asList(expectedTopics)); } - } - } + }; + } } diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java index 6afe7bb469298..72c7e90f3764e 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java @@ -1,11 +1,16 @@ package io.quarkus.it.main; +import java.io.StringReader; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -63,27 +68,35 @@ public void testKafkaStreams() throws Exception { ConsumerRecord record = records.get(0); Assertions.assertEquals(101, record.key()); - Assertions.assertEquals( - "{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", - record.value()); + JsonObject customer = parse(record.value()); + Assertions.assertEquals(101, customer.getInt("id")); + Assertions.assertEquals("Bob", customer.getString("name")); + Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name")); + Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value")); record = records.get(1); Assertions.assertEquals(102, record.key()); - Assertions.assertEquals( - "{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}", - record.value()); + customer = parse(record.value()); + Assertions.assertEquals(102, customer.getInt("id")); + Assertions.assertEquals("Becky", customer.getString("name")); + Assertions.assertEquals("B2C", customer.getJsonObject("category").getString("name")); + Assertions.assertEquals("business-to-customer", customer.getJsonObject("category").getString("value")); record = records.get(2); Assertions.assertEquals(103, record.key()); - Assertions.assertEquals( - "{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", - record.value()); + customer = parse(record.value()); + Assertions.assertEquals(103, customer.getInt("id")); + Assertions.assertEquals("Bruce", customer.getString("name")); + Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name")); + Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value")); record = records.get(3); Assertions.assertEquals(104, record.key()); - Assertions.assertEquals( - "{\"id\":104,\"name\":\"Bert\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", - record.value()); + customer = parse(record.value()); + Assertions.assertEquals(104, customer.getInt("id")); + Assertions.assertEquals("Bert", customer.getString("name")); + Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name")); + Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value")); assertCategoryCount(1, 3); assertCategoryCount(2, 1); @@ -144,4 +157,10 @@ private List> poll(Consumer con return result; } + + private JsonObject parse(String json) { + try(JsonReader reader = Json.createReader(new StringReader(json))) { + return reader.readObject(); + } + } } From c7bc6917439b9b8fdf58340f82680ac5a229befb Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Tue, 4 Jun 2019 22:24:37 +0200 Subject: [PATCH 6/6] #2691 Moving integration tests to "kafka" package --- .../main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java | 3 ++- .../io/quarkus/it/{main => kafka}/KafkaConsumerITCase.java | 2 +- .../java/io/quarkus/it/{main => kafka}/KafkaConsumerTest.java | 2 +- .../io/quarkus/it/{main => kafka}/KafkaProducerITCase.java | 2 +- .../java/io/quarkus/it/{main => kafka}/KafkaProducerTest.java | 2 +- .../io/quarkus/it/{main => kafka}/KafkaStreamsITCase.java | 2 +- .../java/io/quarkus/it/{main => kafka}/KafkaStreamsTest.java | 4 ++-- .../java/io/quarkus/it/{main => kafka}/KafkaTestResource.java | 2 +- 8 files changed, 10 insertions(+), 9 deletions(-) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaConsumerITCase.java (81%) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaConsumerTest.java (98%) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaProducerITCase.java (81%) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaProducerTest.java (98%) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaStreamsITCase.java (80%) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaStreamsTest.java (98%) rename integration-tests/kafka/src/test/java/io/quarkus/it/{main => kafka}/KafkaTestResource.java (97%) diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java index 83c156158c111..d7cebe8a7312f 100644 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.state.Stores; import io.quarkus.runtime.StartupEvent; + @ApplicationScoped @Path("/kafkastreams") public class KafkaStreamsPipeline { @@ -145,5 +146,5 @@ public Boolean call() throws Exception { return topicNames.containsAll(Arrays.asList(expectedTopics)); } }; - } + } } diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaConsumerITCase.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaConsumerITCase.java similarity index 81% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaConsumerITCase.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaConsumerITCase.java index 42134366cd911..f8eb48897dbcb 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaConsumerITCase.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaConsumerITCase.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import io.quarkus.test.junit.SubstrateTest; diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaConsumerTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaConsumerTest.java similarity index 98% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaConsumerTest.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaConsumerTest.java index 6c77f2bad54c9..f4f4aacaac0f5 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaConsumerTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaConsumerTest.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import java.util.Properties; diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaProducerITCase.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaProducerITCase.java similarity index 81% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaProducerITCase.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaProducerITCase.java index 7bf5b3c6d7734..4075439e8cfc1 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaProducerITCase.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaProducerITCase.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import io.quarkus.test.junit.SubstrateTest; diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaProducerTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaProducerTest.java similarity index 98% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaProducerTest.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaProducerTest.java index 1b14c552b95b0..af3add9461855 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaProducerTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaProducerTest.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import java.time.Duration; import java.util.Collections; diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java similarity index 80% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java index a2d05684cf7d5..2410b099dc1bd 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import io.quarkus.test.junit.SubstrateTest; diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java similarity index 98% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java index 72c7e90f3764e..17e0a3afe8230 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import java.io.StringReader; import java.time.Duration; @@ -159,7 +159,7 @@ private List> poll(Consumer con } private JsonObject parse(String json) { - try(JsonReader reader = Json.createReader(new StringReader(json))) { + try (JsonReader reader = Json.createReader(new StringReader(json))) { return reader.readObject(); } } diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java similarity index 97% rename from integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaTestResource.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java index 5fa03a9b8822a..4f87b8bebbfd3 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaTestResource.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java @@ -1,4 +1,4 @@ -package io.quarkus.it.main; +package io.quarkus.it.kafka; import java.io.File; import java.util.Collections;