Skip to content

Commit

Permalink
feat: created a build item + moved configuration in new spi module
Browse files Browse the repository at this point in the history
  • Loading branch information
bsoaressimoes committed Jan 29, 2024
1 parent 493a471 commit bff64d4
Show file tree
Hide file tree
Showing 63 changed files with 284 additions and 189 deletions.

This file was deleted.

This file was deleted.

5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>
<version>${quarkus-kafka-streams-processor.version}</version>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-spi</artifactId>
<version>${quarkus-kafka-streams-processor.version}</version>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-text-map-accessors</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
<packaging>pom</packaging>
<properties>
<quarkus-kafka-streams-processor.version>0.0.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
<quarkus-kafka-streams-processor.version>1.1.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
</properties>
<modules>
<module>application</module>
Expand Down
4 changes: 4 additions & 0 deletions deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@
*/
package io.quarkiverse.kafkastreamsprocessor.kafka.streams.deployment;

import java.util.Map;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.DotName;

import io.quarkiverse.kafkastreamsprocessor.api.Processor;
import io.quarkiverse.kafkastreamsprocessor.impl.SinkToTopicMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.impl.SourceToTopicsMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.runtime.KStreamsProcessorConfigRuntime;
import io.quarkiverse.kafkastreamsprocessor.spi.TopologyConfigBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;

class KafkaStreamsProcessorProcessor {
public class KafkaStreamsProcessorProcessor {

private static final String FEATURE = "kafka-streams-processor";

Expand Down Expand Up @@ -73,4 +79,16 @@ public void registerRetryExceptions(BuildProducer<ReflectiveClassBuildItem> refl
.fields(false)
.build()));
}

@BuildStep
public void registerTopologyBuildItem(BuildProducer<TopologyConfigBuildItem> configMappingBuildItemProducer,
KStreamsProcessorConfigRuntime kStreamsProcessorConfig) {
Map<String, String[]> sourceToTopicsMapping = new SourceToTopicsMappingBuilder(kStreamsProcessorConfig)
.sourceToTopicsMapping();
Map<String, String> sinkToTopicMapping = new SinkToTopicMappingBuilder(kStreamsProcessorConfig)
.sinkToTopicMapping();

configMappingBuildItemProducer
.produce(new TopologyConfigBuildItem(sourceToTopicsMapping, sinkToTopicMapping));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
package io.quarkiverse.kafkastreamsprocessor.kafka.streams.test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand All @@ -34,6 +38,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException;
import io.quarkiverse.kafkastreamsprocessor.spi.TopologyConfigBuildItem;
import io.quarkus.builder.BuildChainBuilder;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
Expand All @@ -43,6 +48,8 @@ public class KafkaStreamsProcessorProcessorTest {

private static volatile List<ReflectiveClassBuildItem> registeredClasses;

private static volatile TopologyConfigBuildItem topologyConfigBuildItem;

@RegisterExtension
static QuarkusUnitTest runner = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
Expand All @@ -56,9 +63,12 @@ private static Consumer<BuildChainBuilder> buildCustomizer() {
return chainBuilder -> chainBuilder.addBuildStep(
context -> {
registeredClasses = context.consumeMulti(ReflectiveClassBuildItem.class);
topologyConfigBuildItem = context.consume(TopologyConfigBuildItem.class);
checkProperClassesAreRegistered();
checkTopologyConfigurationBuildItem();
})
.consumes(ReflectiveClassBuildItem.class)
.consumes(TopologyConfigBuildItem.class)
.produces(GeneratedResourceBuildItem.class)
.build();
}
Expand All @@ -75,6 +85,13 @@ private static void checkProperClassesAreRegistered() {
assertThat(allRegisteredClasses, hasItem(RetryableException.class.getName()));
}

private static void checkTopologyConfigurationBuildItem() {
assertThat(topologyConfigBuildItem.getSinkToTopicMapping(), equalTo(Map.of("emitter-channel", "pong-events")));
assertThat(topologyConfigBuildItem.getSourceToTopicsMapping(), hasKey("receiver-channel"));
assertThat(topologyConfigBuildItem.getSourceToTopicsMapping().get("receiver-channel"),
arrayContaining(equalTo("ping-events")));
}

@Test
void shouldRegisterTypesForReflection() {
// if it gets there, it succeeded
Expand Down
4 changes: 4 additions & 0 deletions impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-spi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-text-map-accessors</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,26 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import io.quarkiverse.kafkastreamsprocessor.api.SinkToTopicMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig;
import io.quarkiverse.kafkastreamsprocessor.api.properties.SinkConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.SinkConfig;
import lombok.extern.slf4j.Slf4j;

/**
* Object to inject to get access to the resolved mapping between sink and topic for a multi output processor, using the
* conventions set up by the framework based on config properties like:
*
* <pre>
* kafkastreamsprocessor.output.sinks.pong.topic=pong-events
* kafkastreamsprocessor.output.sinks.pang.topic=pang-events
* </pre>
* <p>
* Where:
* </p>
* <ul>
* <li>pong and pang are the sinks</li>
* <li>pong-events and pang-events the Kafka topics</li>
* </ul>
*
* Multi-output topic configuration.
* <p>
* Inspired by <a href=
Expand All @@ -44,7 +58,11 @@
*/
@ApplicationScoped
@Slf4j
public class DefaultSinkToTopicMappingBuilder implements SinkToTopicMappingBuilder {
public class SinkToTopicMappingBuilder {
/**
* Default sink name created by KafkaStreams if no sink is configured manually
*/
String DEFAULT_SINK_NAME = "emitter-channel";
/**
* Configuration object of the extension
*/
Expand All @@ -57,14 +75,19 @@ public class DefaultSinkToTopicMappingBuilder implements SinkToTopicMappingBuild
* Configuration object of the extension
*/
@Inject
public DefaultSinkToTopicMappingBuilder(KStreamsProcessorConfig extensionConfiguration) {
public SinkToTopicMappingBuilder(KStreamsProcessorConfig extensionConfiguration) {
this.extensionConfiguration = extensionConfiguration;
}

/**
* {@inheritDoc}
* Looks at the configuration and extracts from it the mapping from the sink to the Kafka topic.
* <p>
* This method is exposed so you can do any kind of technical postprocessing based on the Kafka topic and the sink
* names.
* </p>
*
* @return a map with keys the sink names and values the corresponding Kafka topic name
*/
@Override
public Map<String, String> sinkToTopicMapping() {
// Extract topic name for each sink if any has been configured
Map<String, String> sinkToTopicMapping = buildMapping();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,38 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import io.quarkiverse.kafkastreamsprocessor.api.SourceToTopicsMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.api.properties.KStreamsProcessorConfig;
import io.quarkiverse.kafkastreamsprocessor.api.properties.SourceConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.SourceConfig;
import lombok.extern.slf4j.Slf4j;

/**
* Object to inject to get access to the resolved mapping between input topics and sources for a multi input processor, using
* the
* conventions set up by the framework based on config properties like:
*
* <pre>
* kafkastreamsprocessor.input.sources.pong.topics=pong-events
* kafkastreamsprocessor.input.sources.pang.topics=pang-events,ping-events
* </pre>
* <p>
* Where:
* </p>
* <ul>
* <li>pong and pang are the sources</li>
* <li>ping-events, pong-events and pang-events the Kafka topics</li>
* </ul>
*
* Multi-input topic configuration Inspired by <a href=
* "https://github.com/smallrye/smallrye-reactive-messaging/blob/main/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/impl/ConfiguredChannelFactory.java">smallrye-reactive-messaging
* ConfiguredChannelFactory</a>
*/
@ApplicationScoped
@Slf4j
public class DefaultSourceToTopicsMappingBuilder implements SourceToTopicsMappingBuilder {
public class SourceToTopicsMappingBuilder {
/**
* Default source name created by KafkaStreams if no source is configured manually
*/
String DEFAULT_SOURCE_NAME = "receiver-channel";
/**
* Configuration of the extension
*/
Expand All @@ -52,14 +71,19 @@ public class DefaultSourceToTopicsMappingBuilder implements SourceToTopicsMappin
* Configuration of the extension
*/
@Inject
public DefaultSourceToTopicsMappingBuilder(KStreamsProcessorConfig extensionConfiguration) {
public SourceToTopicsMappingBuilder(KStreamsProcessorConfig extensionConfiguration) {
this.extensionConfiguration = extensionConfiguration;
}

/**
* {@inheritDoc}
* Looks at the configuration and extracts from it the mapping from the source to the Kafka topic(s).
* <p>
* This method is exposed so you can do any kind of technical postprocessing based on the Kafka topic and the source
* names.
* </p>
*
* @return a map with keys the sink names and values the corresponding list of Kafka topic names
*/
@Override
public Map<String, String[]> sourceToTopicsMapping() {
// Extract topic name for each channel
Map<String, String[]> sourceToTopicMapping = buildMapping();
Expand Down
Loading

0 comments on commit bff64d4

Please sign in to comment.