From bff64d4bd63bd0a908d9174866b83f34bc2e4cba Mon Sep 17 00:00:00 2001 From: bsoaressimoes Date: Mon, 29 Jan 2024 14:15:43 +0100 Subject: [PATCH] feat: created a build item + moved configuration in new spi module --- .../api/SinkToTopicMappingBuilder.java | 56 ------------------ .../api/SourceToTopicsMappingBuilder.java | 57 ------------------- bom/application/pom.xml | 5 ++ bom/pom.xml | 2 +- deployment/pom.xml | 4 ++ .../KafkaStreamsProcessorProcessor.java | 20 ++++++- .../KafkaStreamsProcessorProcessorTest.java | 17 ++++++ impl/pom.xml | 4 ++ ...er.java => SinkToTopicMappingBuilder.java} | 37 +++++++++--- ...java => SourceToTopicsMappingBuilder.java} | 38 ++++++++++--- .../impl/TopologyProducer.java | 4 +- .../decorator/processor/DlqDecorator.java | 4 +- ...DLQProductionExceptionHandlerDelegate.java | 2 +- ...gAndSendToDlqExceptionHandlerDelegate.java | 2 +- .../CustomTopologyWithPojoDriverTest.java | 2 +- .../impl/JsonPayloadTopologyDriverTest.java | 2 +- ...hCustomObjectMapperTopologyDriverTest.java | 2 +- .../impl/KStreamMetricsQuarkusTest.java | 2 +- .../impl/KStreamTopologyDriverTest.java | 2 +- .../KStreamTopologyWithDlqDriverTest.java | 2 +- .../Kafka2ProcessorTopologyDriverTest.java | 2 +- ...ava => SinkToTopicMappingBuilderTest.java} | 12 ++-- ... => SourceToTopicsMappingBuilderTest.java} | 12 ++-- .../impl/TopologyProducerTest.java | 6 +- ...CdiRequestContextDecoratorQuarkusTest.java | 2 +- .../processor/RetryDecoratorQuarkusTest.java | 2 +- ...OrderedProducerInterceptorQuarkusTest.java | 2 +- .../ProducerInterceptorQuarkusTest.java | 2 +- .../ErrorHandlingStrategyQuarkusTest.java | 2 +- ...roductionExceptionHandlerDelegateTest.java | 4 +- ...ProductionExceptionHandlerQuarkusTest.java | 2 +- ...lqExceptionHandlerDelegateQuarkusTest.java | 2 +- ...SendToDlqExceptionHandlerDelegateTest.java | 4 +- integration-tests/json-pojo/pom.xml | 6 ++ .../jsonpojo/PojoProcessorQuarkusTest.java | 2 +- .../jsonpojo/PojoProcessorTopologyTest.java | 2 +- integration-tests/kafka-to-rest/pom.xml | 5 ++ .../PingClientProcessorQuarkusTest.java | 2 +- ...ngClientProcessorQuarkusWithRetryTest.java | 2 +- ...ProcessorQuarkusWithoutRetryCatchTest.java | 2 +- integration-tests/multioutput/pom.xml | 5 ++ .../PingProcessorTopologyTest.java | 2 +- integration-tests/simple/pom.xml | 5 ++ .../simple/PingProcessorTopologyTest.java | 2 +- .../ProcessorHealthCheckQuarkusTest.java | 2 +- integration-tests/stateful/pom.xml | 5 ++ .../stateful/PingProcessorTopologyTest.java | 2 +- .../ProcessorHealthCheckQuarkusTest.java | 2 +- pom.xml | 1 + runtime/pom.xml | 6 +- .../KStreamsProcessorConfigRuntime.java | 13 +++++ spi/Readme.md | 4 ++ spi/pom.xml | 46 +++++++++++++++ .../spi/TopologyConfigBuildItem.java | 23 ++++++++ .../spi}/properties/DlqConfig.java | 2 +- .../spi}/properties/GlobalDlqConfig.java | 2 +- .../spi}/properties/InputConfig.java | 2 +- .../properties/KStreamsProcessorConfig.java | 2 +- .../spi}/properties/OutputConfig.java | 2 +- .../spi}/properties/RetryConfig.java | 2 +- .../spi}/properties/SinkConfig.java | 2 +- .../spi}/properties/SourceConfig.java | 2 +- spi/src/main/resources/META-INF/beans.xml | 0 63 files changed, 284 insertions(+), 189 deletions(-) delete mode 100644 api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SinkToTopicMappingBuilder.java delete mode 100644 api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SourceToTopicsMappingBuilder.java rename impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/{DefaultSinkToTopicMappingBuilder.java => SinkToTopicMappingBuilder.java} (69%) rename impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/{DefaultSourceToTopicsMappingBuilder.java => SourceToTopicsMappingBuilder.java} (69%) rename impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/{DefaultSinkToTopicMappingBuilderTest.java => SinkToTopicMappingBuilderTest.java} (89%) rename impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/{DefaultSourceToTopicsMappingBuilderTest.java => SourceToTopicsMappingBuilderTest.java} (92%) create mode 100644 runtime/src/main/java/io/quarkiverse/kafkastreamsprocessor/runtime/KStreamsProcessorConfigRuntime.java create mode 100644 spi/Readme.md create mode 100644 spi/pom.xml create mode 100644 spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/TopologyConfigBuildItem.java rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/DlqConfig.java (93%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/GlobalDlqConfig.java (94%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/InputConfig.java (96%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/KStreamsProcessorConfig.java (96%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/OutputConfig.java (95%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/RetryConfig.java (97%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/SinkConfig.java (94%) rename {api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api => spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi}/properties/SourceConfig.java (94%) create mode 100644 spi/src/main/resources/META-INF/beans.xml diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SinkToTopicMappingBuilder.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SinkToTopicMappingBuilder.java deleted file mode 100644 index 948f40c..0000000 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SinkToTopicMappingBuilder.java +++ /dev/null @@ -1,56 +0,0 @@ -/*- - * #%L - * Quarkus Kafka Streams Processor - * %% - * Copyright (C) 2024 Amadeus s.a.s. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package io.quarkiverse.kafkastreamsprocessor.api; - -import java.util.Map; - -/** - * Object to inject to get access to the resolved mapping between sink and topic for a multi output processor, using the - * conventions set up by the framework based on config properties like: - * - *
- * kafkastreamsprocessor.output.sinks.pong.topic=pong-events
- * kafkastreamsprocessor.output.sinks.pang.topic=pang-events
- * 
- *

- * Where: - *

- * - */ -public interface SinkToTopicMappingBuilder { - /** - * Default sink name created by KafkaStreams if no sink is configured manually - */ - String DEFAULT_SINK_NAME = "emitter-channel"; - - /** - * Looks at the configuration and extracts from it the mapping from the sink to the Kafka topic. - *

- * This method is exposed so you can do any kind of technical postprocessing based on the Kafka topic and the sink - * names. - *

- * - * @return a map with keys the sink names and values the corresponding Kafka topic name - */ - Map sinkToTopicMapping(); -} diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SourceToTopicsMappingBuilder.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SourceToTopicsMappingBuilder.java deleted file mode 100644 index 50c8b78..0000000 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/SourceToTopicsMappingBuilder.java +++ /dev/null @@ -1,57 +0,0 @@ -/*- - * #%L - * Quarkus Kafka Streams Processor - * %% - * Copyright (C) 2024 Amadeus s.a.s. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package io.quarkiverse.kafkastreamsprocessor.api; - -import java.util.Map; - -/** - * Object to inject to get access to the resolved mapping between input topics and sources for a multi input processor, using - * the - * conventions set up by the framework based on config properties like: - * - *
- * kafkastreamsprocessor.input.sources.pong.topics=pong-events
- * kafkastreamsprocessor.input.sources.pang.topics=pang-events,ping-events
- * 
- *

- * Where: - *

- * - */ -public interface SourceToTopicsMappingBuilder { - /** - * Default source name created by KafkaStreams if no source is configured manually - */ - String DEFAULT_SOURCE_NAME = "receiver-channel"; - - /** - * Looks at the configuration and extracts from it the mapping from the source to the Kafka topic(s). - *

- * This method is exposed so you can do any kind of technical postprocessing based on the Kafka topic and the source - * names. - *

- * - * @return a map with keys the sink names and values the corresponding list of Kafka topic names - */ - Map sourceToTopicsMapping(); -} diff --git a/bom/application/pom.xml b/bom/application/pom.xml index b2b5fa6..9e7f4c4 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -33,6 +33,11 @@ quarkus-kafka-streams-processor-impl ${quarkus-kafka-streams-processor.version} + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-spi + ${quarkus-kafka-streams-processor.version} + io.quarkiverse.kafkastreamsprocessor quarkus-kafka-streams-processor-text-map-accessors diff --git a/bom/pom.xml b/bom/pom.xml index 1e93681..b25e67b 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -9,7 +9,7 @@ quarkus-kafka-streams-processor-bom-parent pom - 0.0.0-SNAPSHOT + 1.1.0-SNAPSHOT application diff --git a/deployment/pom.xml b/deployment/pom.xml index 61ed5a6..a3080f3 100644 --- a/deployment/pom.xml +++ b/deployment/pom.xml @@ -23,6 +23,10 @@ io.quarkiverse.kafkastreamsprocessor quarkus-kafka-streams-processor-api + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + io.quarkus quarkus-arc-deployment diff --git a/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java b/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java index 5d8e5f8..439ffc1 100644 --- a/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java +++ b/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java @@ -19,6 +19,8 @@ */ package io.quarkiverse.kafkastreamsprocessor.kafka.streams.deployment; +import java.util.Map; + import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.jandex.AnnotationInstance; @@ -26,13 +28,17 @@ import org.jboss.jandex.DotName; import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import io.quarkiverse.kafkastreamsprocessor.impl.SinkToTopicMappingBuilder; +import io.quarkiverse.kafkastreamsprocessor.impl.SourceToTopicsMappingBuilder; +import io.quarkiverse.kafkastreamsprocessor.runtime.KStreamsProcessorConfigRuntime; +import io.quarkiverse.kafkastreamsprocessor.spi.TopologyConfigBuildItem; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; -class KafkaStreamsProcessorProcessor { +public class KafkaStreamsProcessorProcessor { private static final String FEATURE = "kafka-streams-processor"; @@ -73,4 +79,16 @@ public void registerRetryExceptions(BuildProducer refl .fields(false) .build())); } + + @BuildStep + public void registerTopologyBuildItem(BuildProducer configMappingBuildItemProducer, + KStreamsProcessorConfigRuntime kStreamsProcessorConfig) { + Map sourceToTopicsMapping = new SourceToTopicsMappingBuilder(kStreamsProcessorConfig) + .sourceToTopicsMapping(); + Map sinkToTopicMapping = new SinkToTopicMappingBuilder(kStreamsProcessorConfig) + .sinkToTopicMapping(); + + configMappingBuildItemProducer + .produce(new TopologyConfigBuildItem(sourceToTopicsMapping, sinkToTopicMapping)); + } } diff --git a/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java b/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java index 7ca5620..79eab50 100644 --- a/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java +++ b/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java @@ -20,11 +20,15 @@ package io.quarkiverse.kafkastreamsprocessor.kafka.streams.test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasKey; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -34,6 +38,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; +import io.quarkiverse.kafkastreamsprocessor.spi.TopologyConfigBuildItem; import io.quarkus.builder.BuildChainBuilder; import io.quarkus.deployment.builditem.GeneratedResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; @@ -43,6 +48,8 @@ public class KafkaStreamsProcessorProcessorTest { private static volatile List registeredClasses; + private static volatile TopologyConfigBuildItem topologyConfigBuildItem; + @RegisterExtension static QuarkusUnitTest runner = new QuarkusUnitTest() .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) @@ -56,9 +63,12 @@ private static Consumer buildCustomizer() { return chainBuilder -> chainBuilder.addBuildStep( context -> { registeredClasses = context.consumeMulti(ReflectiveClassBuildItem.class); + topologyConfigBuildItem = context.consume(TopologyConfigBuildItem.class); checkProperClassesAreRegistered(); + checkTopologyConfigurationBuildItem(); }) .consumes(ReflectiveClassBuildItem.class) + .consumes(TopologyConfigBuildItem.class) .produces(GeneratedResourceBuildItem.class) .build(); } @@ -75,6 +85,13 @@ private static void checkProperClassesAreRegistered() { assertThat(allRegisteredClasses, hasItem(RetryableException.class.getName())); } + private static void checkTopologyConfigurationBuildItem() { + assertThat(topologyConfigBuildItem.getSinkToTopicMapping(), equalTo(Map.of("emitter-channel", "pong-events"))); + assertThat(topologyConfigBuildItem.getSourceToTopicsMapping(), hasKey("receiver-channel")); + assertThat(topologyConfigBuildItem.getSourceToTopicsMapping().get("receiver-channel"), + arrayContaining(equalTo("ping-events"))); + } + @Test void shouldRegisterTypesForReflection() { // if it gets there, it succeeded diff --git a/impl/pom.xml b/impl/pom.xml index 9685013..c013752 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -33,6 +33,10 @@ io.quarkiverse.kafkastreamsprocessor quarkus-kafka-streams-processor-api + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-spi + io.quarkiverse.kafkastreamsprocessor quarkus-kafka-streams-processor-text-map-accessors diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSinkToTopicMappingBuilder.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/SinkToTopicMappingBuilder.java similarity index 69% rename from impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSinkToTopicMappingBuilder.java rename to impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/SinkToTopicMappingBuilder.java index b2601c8..0701222 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSinkToTopicMappingBuilder.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/SinkToTopicMappingBuilder.java @@ -26,12 +26,26 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import io.quarkiverse.kafkastreamsprocessor.api.SinkToTopicMappingBuilder; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.SinkConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.SinkConfig; import lombok.extern.slf4j.Slf4j; /** + * Object to inject to get access to the resolved mapping between sink and topic for a multi output processor, using the + * conventions set up by the framework based on config properties like: + * + *
+ * kafkastreamsprocessor.output.sinks.pong.topic=pong-events
+ * kafkastreamsprocessor.output.sinks.pang.topic=pang-events
+ * 
+ *

+ * Where: + *

+ *
    + *
  • pong and pang are the sinks
  • + *
  • pong-events and pang-events the Kafka topics
  • + *
+ * * Multi-output topic configuration. *

* Inspired by + * This method is exposed so you can do any kind of technical postprocessing based on the Kafka topic and the sink + * names. + *

+ * + * @return a map with keys the sink names and values the corresponding Kafka topic name */ - @Override public Map sinkToTopicMapping() { // Extract topic name for each sink if any has been configured Map sinkToTopicMapping = buildMapping(); diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSourceToTopicsMappingBuilder.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/SourceToTopicsMappingBuilder.java similarity index 69% rename from impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSourceToTopicsMappingBuilder.java rename to impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/SourceToTopicsMappingBuilder.java index 4a69592..0c84b9c 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSourceToTopicsMappingBuilder.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/SourceToTopicsMappingBuilder.java @@ -27,19 +27,38 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import io.quarkiverse.kafkastreamsprocessor.api.SourceToTopicsMappingBuilder; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.SourceConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.SourceConfig; import lombok.extern.slf4j.Slf4j; /** + * Object to inject to get access to the resolved mapping between input topics and sources for a multi input processor, using + * the + * conventions set up by the framework based on config properties like: + * + *
+ * kafkastreamsprocessor.input.sources.pong.topics=pong-events
+ * kafkastreamsprocessor.input.sources.pang.topics=pang-events,ping-events
+ * 
+ *

+ * Where: + *

+ *
    + *
  • pong and pang are the sources
  • + *
  • ping-events, pong-events and pang-events the Kafka topics
  • + *
+ * * Multi-input topic configuration Inspired by
smallrye-reactive-messaging * ConfiguredChannelFactory */ @ApplicationScoped @Slf4j -public class DefaultSourceToTopicsMappingBuilder implements SourceToTopicsMappingBuilder { +public class SourceToTopicsMappingBuilder { + /** + * Default source name created by KafkaStreams if no source is configured manually + */ + String DEFAULT_SOURCE_NAME = "receiver-channel"; /** * Configuration of the extension */ @@ -52,14 +71,19 @@ public class DefaultSourceToTopicsMappingBuilder implements SourceToTopicsMappin * Configuration of the extension */ @Inject - public DefaultSourceToTopicsMappingBuilder(KStreamsProcessorConfig extensionConfiguration) { + public SourceToTopicsMappingBuilder(KStreamsProcessorConfig extensionConfiguration) { this.extensionConfiguration = extensionConfiguration; } /** - * {@inheritDoc} + * Looks at the configuration and extracts from it the mapping from the source to the Kafka topic(s). + *

+ * This method is exposed so you can do any kind of technical postprocessing based on the Kafka topic and the source + * names. + *

+ * + * @return a map with keys the sink names and values the corresponding list of Kafka topic names */ - @Override public Map sourceToTopicsMapping() { // Extract topic name for each channel Map sourceToTopicMapping = buildMapping(); diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducer.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducer.java index 48cadbf..78b1efa 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducer.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducer.java @@ -34,15 +34,13 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import io.quarkiverse.kafkastreamsprocessor.api.SinkToTopicMappingBuilder; -import io.quarkiverse.kafkastreamsprocessor.api.SourceToTopicsMappingBuilder; import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer; import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.DefaultConfigurationCustomizer; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.DefaultTopologySerdesConfiguration; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TypeUtils; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; /** * Processor must be annotated with {@link io.quarkiverse.kafkastreamsprocessor.api.Processor} diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java index f6eda44..af0e0c0 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java @@ -37,13 +37,13 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import io.quarkiverse.kafkastreamsprocessor.api.SinkToTopicMappingBuilder; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.impl.SinkToTopicMappingBuilder; import io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer; import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler; import io.quarkiverse.kafkastreamsprocessor.impl.errors.ErrorHandlingStrategy; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegate.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegate.java index 189ed95..a13b6d5 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegate.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegate.java @@ -33,9 +33,9 @@ import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.errors.ProductionExceptionHandler; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.KafkaClientSupplierDecorator; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.arc.Unremovable; import lombok.extern.slf4j.Slf4j; diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java index 9a2bf88..8d55f1d 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java @@ -35,9 +35,9 @@ import org.apache.kafka.streams.processor.ProcessorContext; import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.KafkaClientSupplierDecorator; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.arc.Unremovable; import lombok.extern.slf4j.Slf4j; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/CustomTopologyWithPojoDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/CustomTopologyWithPojoDriverTest.java index 38006da..d8fa2da 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/CustomTopologyWithPojoDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/CustomTopologyWithPojoDriverTest.java @@ -46,9 +46,9 @@ import io.quarkiverse.kafkastreamsprocessor.api.Processor; import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration; import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.api.serdes.JacksonSerde; import io.quarkiverse.kafkastreamsprocessor.api.serdes.JacksonSerializer; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadTopologyDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadTopologyDriverTest.java index 2804f75..71bdc91 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadTopologyDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadTopologyDriverTest.java @@ -40,9 +40,9 @@ import org.junit.jupiter.api.Test; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.api.serdes.JacksonDeserializer; import io.quarkiverse.kafkastreamsprocessor.api.serdes.JacksonSerializer; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadWithCustomObjectMapperTopologyDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadWithCustomObjectMapperTopologyDriverTest.java index 9957dbe..7a59dea 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadWithCustomObjectMapperTopologyDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/JsonPayloadWithCustomObjectMapperTopologyDriverTest.java @@ -44,7 +44,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategies; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.jackson.ObjectMapperCustomizer; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamMetricsQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamMetricsQuarkusTest.java index 41d7d0b..73af99e 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamMetricsQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamMetricsQuarkusTest.java @@ -50,9 +50,9 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java index 1a37490..3da5f76 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java @@ -63,11 +63,11 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; import io.quarkiverse.kafkastreamsprocessor.impl.utils.TestSpanExporter; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyWithDlqDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyWithDlqDriverTest.java index 49d3f87..b81bffa 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyWithDlqDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyWithDlqDriverTest.java @@ -46,8 +46,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/Kafka2ProcessorTopologyDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/Kafka2ProcessorTopologyDriverTest.java index b0c16ef..0dd9502 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/Kafka2ProcessorTopologyDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/Kafka2ProcessorTopologyDriverTest.java @@ -42,8 +42,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSinkToTopicMappingBuilderTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/SinkToTopicMappingBuilderTest.java similarity index 89% rename from impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSinkToTopicMappingBuilderTest.java rename to impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/SinkToTopicMappingBuilderTest.java index abb809a..a327c4d 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSinkToTopicMappingBuilderTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/SinkToTopicMappingBuilderTest.java @@ -33,12 +33,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.OutputConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.SinkConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.OutputConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.SinkConfig; @ExtendWith(MockitoExtension.class) -public class DefaultSinkToTopicMappingBuilderTest { +public class SinkToTopicMappingBuilderTest { @Mock KStreamsProcessorConfig extensionConfiguration; @@ -47,11 +47,11 @@ public class DefaultSinkToTopicMappingBuilderTest { Map sinks = new HashMap<>(); - DefaultSinkToTopicMappingBuilder sinkConfiguration; + SinkToTopicMappingBuilder sinkConfiguration; @BeforeEach void setUp() { - sinkConfiguration = new DefaultSinkToTopicMappingBuilder(extensionConfiguration); + sinkConfiguration = new SinkToTopicMappingBuilder(extensionConfiguration); when(extensionConfiguration.output()).thenReturn(outputConfig); when(outputConfig.sinks()).thenReturn(sinks); } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSourceToTopicsMappingBuilderTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/SourceToTopicsMappingBuilderTest.java similarity index 92% rename from impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSourceToTopicsMappingBuilderTest.java rename to impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/SourceToTopicsMappingBuilderTest.java index 4b90d8d..258051a 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/DefaultSourceToTopicsMappingBuilderTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/SourceToTopicsMappingBuilderTest.java @@ -34,12 +34,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.quarkiverse.kafkastreamsprocessor.api.properties.InputConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.SourceConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.InputConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.SourceConfig; @ExtendWith(MockitoExtension.class) -public class DefaultSourceToTopicsMappingBuilderTest { +public class SourceToTopicsMappingBuilderTest { @Mock KStreamsProcessorConfig extensionConfiguration; @@ -48,11 +48,11 @@ public class DefaultSourceToTopicsMappingBuilderTest { Map sources = new HashMap<>(); - DefaultSourceToTopicsMappingBuilder sourceConfiguration; + SourceToTopicsMappingBuilder sourceConfiguration; @BeforeEach void setUp() { - sourceConfiguration = new DefaultSourceToTopicsMappingBuilder(extensionConfiguration); + sourceConfiguration = new SourceToTopicsMappingBuilder(extensionConfiguration); when(extensionConfiguration.input()).thenReturn(inputConfig); when(inputConfig.sources()).thenReturn(sources); } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java index 7fb6062..5084ebd 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java @@ -49,14 +49,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.quarkiverse.kafkastreamsprocessor.api.SinkToTopicMappingBuilder; -import io.quarkiverse.kafkastreamsprocessor.api.SourceToTopicsMappingBuilder; import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer; import io.quarkiverse.kafkastreamsprocessor.api.configuration.store.StoreConfiguration; import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.DlqConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.DlqConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; @ExtendWith(MockitoExtension.class) class TopologyProducerTest { diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorQuarkusTest.java index 25bdff5..3280520 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorQuarkusTest.java @@ -46,9 +46,9 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.decorator.request.RequestScopeConsumerProcessor; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecoratorQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecoratorQuarkusTest.java index 48608d9..5bdd17a 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecoratorQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecoratorQuarkusTest.java @@ -50,8 +50,8 @@ import io.micrometer.core.instrument.Tag; import io.quarkiverse.kafkastreamsprocessor.api.Processor; import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/OrderedProducerInterceptorQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/OrderedProducerInterceptorQuarkusTest.java index 2cf6cec..b995d0b 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/OrderedProducerInterceptorQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/OrderedProducerInterceptorQuarkusTest.java @@ -51,8 +51,8 @@ import io.quarkiverse.kafkastreamsprocessor.api.Processor; import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/ProducerInterceptorQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/ProducerInterceptorQuarkusTest.java index 9d1781b..92949d4 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/ProducerInterceptorQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/ProducerInterceptorQuarkusTest.java @@ -51,8 +51,8 @@ import io.quarkiverse.kafkastreamsprocessor.api.Processor; import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyQuarkusTest.java index bcca09f..9148c62 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyQuarkusTest.java @@ -50,8 +50,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegateTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegateTest.java index 245d9a1..ec4746d 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegateTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerDelegateTest.java @@ -45,10 +45,10 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.quarkiverse.kafkastreamsprocessor.api.properties.GlobalDlqConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.TestException; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.MockKafkaStreamsProcessorMetrics; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.GlobalDlqConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; @ExtendWith(MockitoExtension.class) class GlobalDLQProductionExceptionHandlerDelegateTest { diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerQuarkusTest.java index 49dd0e2..a1dc486 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/GlobalDLQProductionExceptionHandlerQuarkusTest.java @@ -53,8 +53,8 @@ import de.svenjacobs.loremipsum.LoremIpsum; import io.micrometer.core.instrument.MeterRegistry; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateQuarkusTest.java index e8d13f3..71c63c3 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateQuarkusTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateQuarkusTest.java @@ -54,8 +54,8 @@ import io.micrometer.core.instrument.MeterRegistry; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java index 2770d5e..e0a8c5b 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java @@ -50,10 +50,10 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.quarkiverse.kafkastreamsprocessor.api.properties.DlqConfig; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.MockKafkaStreamsProcessorMetrics; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.DlqConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; @ExtendWith(MockitoExtension.class) class LogAndSendToDlqExceptionHandlerDelegateTest { diff --git a/integration-tests/json-pojo/pom.xml b/integration-tests/json-pojo/pom.xml index 351eec5..01f4d4d 100644 --- a/integration-tests/json-pojo/pom.xml +++ b/integration-tests/json-pojo/pom.xml @@ -69,6 +69,12 @@ quarkus-opentelemetry runtime
+ + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + org.apache.kafka diff --git a/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorQuarkusTest.java b/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorQuarkusTest.java index 5e85a97..5335bf7 100644 --- a/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorQuarkusTest.java +++ b/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorQuarkusTest.java @@ -44,7 +44,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; @QuarkusTest diff --git a/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorTopologyTest.java b/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorTopologyTest.java index 312c0af..752318a 100644 --- a/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorTopologyTest.java +++ b/integration-tests/json-pojo/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/jsonpojo/PojoProcessorTopologyTest.java @@ -36,7 +36,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; @QuarkusTest diff --git a/integration-tests/kafka-to-rest/pom.xml b/integration-tests/kafka-to-rest/pom.xml index aafe75a..078c560 100644 --- a/integration-tests/kafka-to-rest/pom.xml +++ b/integration-tests/kafka-to-rest/pom.xml @@ -102,6 +102,11 @@ quarkus-kafka-streams-processor runtime + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + io.quarkus quarkus-smallrye-health diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusTest.java index 0fc511c..a0281d3 100644 --- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusTest.java +++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusTest.java @@ -45,8 +45,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; @QuarkusTest diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java index 6d4d061..1bf213c 100644 --- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java +++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java @@ -48,8 +48,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.InjectMock; import io.quarkus.test.junit.QuarkusTest; diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java index 707f391..941a82f 100644 --- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java +++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java @@ -44,8 +44,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.InjectMock; import io.quarkus.test.junit.QuarkusTest; diff --git a/integration-tests/multioutput/pom.xml b/integration-tests/multioutput/pom.xml index ef097ee..f093dc1 100644 --- a/integration-tests/multioutput/pom.xml +++ b/integration-tests/multioutput/pom.xml @@ -85,6 +85,11 @@ quarkus-kafka-streams-processor runtime + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + io.quarkus quarkus-smallrye-health diff --git a/integration-tests/multioutput/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/multioutput/PingProcessorTopologyTest.java b/integration-tests/multioutput/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/multioutput/PingProcessorTopologyTest.java index 3523979..1d9a335 100644 --- a/integration-tests/multioutput/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/multioutput/PingProcessorTopologyTest.java +++ b/integration-tests/multioutput/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/multioutput/PingProcessorTopologyTest.java @@ -41,8 +41,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; /** diff --git a/integration-tests/simple/pom.xml b/integration-tests/simple/pom.xml index e63db67..253f647 100644 --- a/integration-tests/simple/pom.xml +++ b/integration-tests/simple/pom.xml @@ -54,6 +54,11 @@ quarkus-kafka-streams-processor runtime + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + io.quarkus quarkus-smallrye-health diff --git a/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/PingProcessorTopologyTest.java b/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/PingProcessorTopologyTest.java index 75e8a55..9f6c54b 100644 --- a/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/PingProcessorTopologyTest.java +++ b/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/PingProcessorTopologyTest.java @@ -43,8 +43,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; @QuarkusTest diff --git a/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/ProcessorHealthCheckQuarkusTest.java b/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/ProcessorHealthCheckQuarkusTest.java index 78b286b..57ceb4d 100644 --- a/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/ProcessorHealthCheckQuarkusTest.java +++ b/integration-tests/simple/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/simple/ProcessorHealthCheckQuarkusTest.java @@ -38,7 +38,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; diff --git a/integration-tests/stateful/pom.xml b/integration-tests/stateful/pom.xml index 9b79e58..de21ba7 100644 --- a/integration-tests/stateful/pom.xml +++ b/integration-tests/stateful/pom.xml @@ -51,6 +51,11 @@ quarkus-kafka-streams-processor runtime + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + io.quarkus quarkus-smallrye-health diff --git a/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingProcessorTopologyTest.java b/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingProcessorTopologyTest.java index 4c3a5e2..c9abfe9 100644 --- a/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingProcessorTopologyTest.java +++ b/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/PingProcessorTopologyTest.java @@ -42,8 +42,8 @@ import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.testframework.StateDirCleaningResource; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; diff --git a/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/ProcessorHealthCheckQuarkusTest.java b/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/ProcessorHealthCheckQuarkusTest.java index 9c5a898..8f2d599 100644 --- a/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/ProcessorHealthCheckQuarkusTest.java +++ b/integration-tests/stateful/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/stateful/ProcessorHealthCheckQuarkusTest.java @@ -39,7 +39,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import io.quarkiverse.kafkastreamsprocessor.testframework.StateDirCleaningResource; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; diff --git a/pom.xml b/pom.xml index 9bb92f3..ddee27c 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ bom test-framework + spi api text-map-accessors impl diff --git a/runtime/pom.xml b/runtime/pom.xml index b4a522b..98af396 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -25,7 +25,7 @@ io.quarkiverse.kafkastreamsprocessor - quarkus-kafka-streams-processor-impl + quarkus-kafka-streams-processor-spi io.quarkus @@ -43,6 +43,10 @@ io.quarkus quarkus-micrometer + + io.quarkus + quarkus-smallrye-fault-tolerance + diff --git a/runtime/src/main/java/io/quarkiverse/kafkastreamsprocessor/runtime/KStreamsProcessorConfigRuntime.java b/runtime/src/main/java/io/quarkiverse/kafkastreamsprocessor/runtime/KStreamsProcessorConfigRuntime.java new file mode 100644 index 0000000..8bde6bd --- /dev/null +++ b/runtime/src/main/java/io/quarkiverse/kafkastreamsprocessor/runtime/KStreamsProcessorConfigRuntime.java @@ -0,0 +1,13 @@ +package io.quarkiverse.kafkastreamsprocessor.runtime; + +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; + +/** + * Interface to trick the injection of the {@link KStreamsProcessorConfig} in the deployment module + */ +@ConfigMapping(prefix = "kafkastreamsprocessor") +@ConfigRoot +public interface KStreamsProcessorConfigRuntime extends KStreamsProcessorConfig { +} diff --git a/spi/Readme.md b/spi/Readme.md new file mode 100644 index 0000000..273e37c --- /dev/null +++ b/spi/Readme.md @@ -0,0 +1,4 @@ +# SPI + +SPI module for the Kafka Streams Processor library. +It is expected to be put as a compile dependency on the application's classpath. \ No newline at end of file diff --git a/spi/pom.xml b/spi/pom.xml new file mode 100644 index 0000000..dd17620 --- /dev/null +++ b/spi/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-parent + 1.1.0-SNAPSHOT + + quarkus-kafka-streams-processor-spi + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-bom + ${project.version} + pom + import + + + + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-api + + + io.quarkus + quarkus-core + + + io.quarkus + quarkus-builder + + + + org.eclipse.microprofile.config + microprofile-config-api + + + org.projectlombok + lombok + provided + + + diff --git a/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/TopologyConfigBuildItem.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/TopologyConfigBuildItem.java new file mode 100644 index 0000000..42345fb --- /dev/null +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/TopologyConfigBuildItem.java @@ -0,0 +1,23 @@ +package io.quarkiverse.kafkastreamsprocessor.spi; + +import java.util.Map; + +import io.quarkus.builder.item.SimpleBuildItem; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * A build item that gives access to the topology configured in the extension + */ +@Getter +@AllArgsConstructor +public final class TopologyConfigBuildItem extends SimpleBuildItem { + /** + * Mapping between the Source in the configuration and the associated topics + */ + Map sourceToTopicsMapping; + /** + * Mapping between the Sinks in the configuration and the associated topics + */ + Map sinkToTopicMapping; +} diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/DlqConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/DlqConfig.java similarity index 93% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/DlqConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/DlqConfig.java index e9a045d..53a2f9e 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/DlqConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/DlqConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import java.util.Optional; diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/GlobalDlqConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/GlobalDlqConfig.java similarity index 94% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/GlobalDlqConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/GlobalDlqConfig.java index 0367b55..ce6d497 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/GlobalDlqConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/GlobalDlqConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import java.util.Optional; diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/InputConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/InputConfig.java similarity index 96% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/InputConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/InputConfig.java index a9b1592..3ed10b7 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/InputConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/InputConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import java.util.List; import java.util.Map; diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java similarity index 96% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java index b23a1e0..8352dcc 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java @@ -18,7 +18,7 @@ * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; import io.smallrye.config.ConfigMapping; diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/OutputConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/OutputConfig.java similarity index 95% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/OutputConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/OutputConfig.java index b2d30e2..4483733 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/OutputConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/OutputConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import java.util.Map; import java.util.Optional; diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/RetryConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/RetryConfig.java similarity index 97% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/RetryConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/RetryConfig.java index 2167684..6d79827 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/RetryConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/RetryConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import java.time.temporal.ChronoUnit; import java.util.List; diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/SinkConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/SinkConfig.java similarity index 94% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/SinkConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/SinkConfig.java index 2c279a5..032dcdb 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/SinkConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/SinkConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; /** * Configuration related to a diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/SourceConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/SourceConfig.java similarity index 94% rename from api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/SourceConfig.java rename to spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/SourceConfig.java index 723ebd9..ff34b07 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/SourceConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/SourceConfig.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package io.quarkiverse.kafkastreamsprocessor.api.properties; +package io.quarkiverse.kafkastreamsprocessor.spi.properties; import java.util.List; diff --git a/spi/src/main/resources/META-INF/beans.xml b/spi/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000..e69de29