diff --git a/bom/test/pom.xml b/bom/test/pom.xml
index ea13a48..06d1696 100644
--- a/bom/test/pom.xml
+++ b/bom/test/pom.xml
@@ -9,7 +9,7 @@
quarkus-kafka-streams-processor-test-bom
pom
- 3.6.2
+ 3.7.1
5.15.0
@@ -43,12 +43,12 @@
io.rest-assured
json-path
- 5.4.0
+ 5.5.0
org.springframework.kafka
spring-kafka-test
- 3.1.3
+ 3.2.4
org.apache.kafka
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java
index 2e23da5..f7ecd9d 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java
@@ -115,7 +115,8 @@ private TopologyProducer newTopologyProducer(
when(dlqConfig.topic()).thenReturn(Optional.ofNullable(dlq));
when(sourceToTopicsMappingBuilder.sourceToTopicsMapping()).thenReturn(sourceToTopicMapping);
when(sinkToTopicMappingBuilder.sinkToTopicMapping()).thenReturn(sinkToTopicMapping);
- TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer, sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
+ TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer,
+ sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
return topologyProducer;
}
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java
index 13e06ec..7fa0b06 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java
@@ -55,7 +55,7 @@ void setup() {
customizer = new DefaultTopologySerdesConfiguration(objectMapper);
}
- @Test
+ @Test
void shouldSetIntrospectionSerializerByDefault() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
when(configuration.getSinkValueSerializer()).thenReturn(null);
@@ -74,16 +74,16 @@ void shouldSetCustomizedSinkSerializerAndValueSerde() {
assertThat(configuration.getSourceValueSerde(), is(equalTo(customSerde)));
}
- @Test
+ @Test
void shouldSetProtobufSerdeWhenProcessorIsProtobuf() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) PingMessage.Ping.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(KafkaProtobufSerde.class));
}
- @Test
+ @Test
void shouldSetJacksonSerdeWhenProcessorIsAPojo() {
- when(configuration.getProcessorPayloadType()).thenReturn((Class)JSonPojo.class);
+ when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(JacksonSerde.class));
}
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
index 3264e55..1f371a8 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
@@ -52,11 +52,11 @@ class CdiRequestContextDecoratorTest {
@Mock
ManagedContext requestContext;
- @BeforeEach
- public void setup() {
- when(container.requestContext()).thenReturn(requestContext);
- decorator = new CdiRequestContextDecorator<>(processor, container);
- }
+ @BeforeEach
+ public void setup() {
+ when(container.requestContext()).thenReturn(requestContext);
+ decorator = new CdiRequestContextDecorator<>(processor, container);
+ }
@Test
public void shouldActivateDeactivateWhenProcessIsCalled() {
@@ -66,10 +66,10 @@ public void shouldActivateDeactivateWhenProcessIsCalled() {
verify(requestContext).terminate();
}
- @Test
- public void shouldNotActivateRequestScopeIfAlreadyActivated() {
- when(requestContext.isActive()).thenReturn(true);
- decorator.process(new Record<>("Hello", "World", 0L));
- verify(requestContext, never()).activate();
- }
+ @Test
+ public void shouldNotActivateRequestScopeIfAlreadyActivated() {
+ when(requestContext.isActive()).thenReturn(true);
+ decorator.process(new Record<>("Hello", "World", 0L));
+ verify(requestContext, never()).activate();
+ }
}
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
index 9a5ffa5..5187037 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
@@ -53,7 +53,7 @@ class PunctuatorDecorationProcessorDecoratorTest {
@Mock
InternalProcessorContext context;
- @Test
+ @Test
void contextWrapped() {
when(decoratedPunctuators.get()).thenReturn(decoratedPunctuator);
Processor processor = new Processor<>() {
@@ -68,14 +68,14 @@ public void process(Record record) {
}
};
- PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>(
- processor,
- decoratedPunctuators);
- decorator.init(context);
- decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null));
- decorator.close();
+ PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>(
+ processor,
+ decoratedPunctuators);
+ decorator.init(context);
+ decorator.process(new Record<>("blabla", PingMessage.Ping.newBuilder().setMessage("blabla").build(), 0L, null));
+ decorator.close();
- verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
+ verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(decoratedPunctuator).setRealPunctuatorInstance(punctuator);
}
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java
index e0a8c5b..40ffee6 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java
@@ -97,37 +97,37 @@ class LogAndSendToDlqExceptionHandlerDelegateTest {
@BeforeEach
void setUp() {
- when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
+ when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
}
- @Test
- void shouldNotBlockAndSendToDlqIfPossible() {
- when(record.key()).thenReturn(RECORD_KEY_BYTES);
- when(record.value()).thenReturn(RECORD_VALUE_BYTES);
- when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
- when(record.topic()).thenReturn(INPUT_TOPIC);
- when(record.partition()).thenReturn(PARTITION);
- when(record.headers()).thenReturn(headers);
- when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
- when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
- RecordHeaders headersWithMetadata = new RecordHeaders();
- when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
- .thenReturn(headersWithMetadata);
- when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);
-
- handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
- kStreamsProcessorConfig);
- handler.configure(Collections.emptyMap());
-
- DeserializationHandlerResponse response = handler.handle(context, record, exception);
- assertEquals(DeserializationHandlerResponse.CONTINUE, response);
-
- verify(dlqProducerMock)
- .send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
- headersWithMetadata)), any(Callback.class));
- verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
- assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
- }
+ @Test
+ void shouldNotBlockAndSendToDlqIfPossible() {
+ when(record.key()).thenReturn(RECORD_KEY_BYTES);
+ when(record.value()).thenReturn(RECORD_VALUE_BYTES);
+ when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
+ when(record.topic()).thenReturn(INPUT_TOPIC);
+ when(record.partition()).thenReturn(PARTITION);
+ when(record.headers()).thenReturn(headers);
+ when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
+ when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
+ RecordHeaders headersWithMetadata = new RecordHeaders();
+ when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
+ .thenReturn(headersWithMetadata);
+ when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);
+
+ handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
+ kStreamsProcessorConfig);
+ handler.configure(Collections.emptyMap());
+
+ DeserializationHandlerResponse response = handler.handle(context, record, exception);
+ assertEquals(DeserializationHandlerResponse.CONTINUE, response);
+
+ verify(dlqProducerMock)
+ .send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
+ headersWithMetadata)), any(Callback.class));
+ verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
+ assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
+ }
@Test
void shouldOnlyContinueIfDefaultErrorStrategy() {
@@ -136,7 +136,7 @@ void shouldOnlyContinueIfDefaultErrorStrategy() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
- kStreamsProcessorConfig);
+ kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());
DeserializationHandlerResponse response = handler.handle(context, record, exception);
@@ -152,7 +152,7 @@ void shouldFailFastIfDlqStrategyWithoutTopic() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
- kStreamsProcessorConfig);
+ kStreamsProcessorConfig);
assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap()));
}
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java
index 933a8cc..4b9a4dc 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java
@@ -45,13 +45,13 @@ public void setUp() {
healthCheck.kafkaStreams = streams;
}
- @Test
- public void testStateRunningHealthOK() {
- when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
- HealthCheckResponse response = healthCheck.call();
- assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
- assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
- }
+ @Test
+ public void testStateRunningHealthOK() {
+ when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
+ HealthCheckResponse response = healthCheck.call();
+ assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
+ assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
+ }
@Test
public void testUndefinedStateHealthKO() {
@@ -60,13 +60,13 @@ public void testUndefinedStateHealthKO() {
assertThat(response.getData().get().get("state"), nullValue());
}
- @Test
- public void testStateRebalancingHealthKO() {
- when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
- HealthCheckResponse response = healthCheck.call();
- assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
- assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
- }
+ @Test
+ public void testStateRebalancingHealthKO() {
+ when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
+ HealthCheckResponse response = healthCheck.call();
+ assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
+ assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
+ }
@Test
public void testStateKOIfKakfaStreamsNotInjected() {
diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java
index 1bf213c..8fec6bf 100644
--- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java
+++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java
@@ -90,24 +90,26 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}
- @Test
- void singleMessageWithRetry() {
- // response "PONG"
- when(client.ping())
- .thenThrow(mock(RetryableException.class))
- .thenThrow(mock(RetryableException.class))
- .thenThrow(mock(RetryableException.class))
- .thenReturn("PONG");
-
- producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
- producer.flush();
-
- ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
- Durations.TEN_SECONDS);
- consumer.commitSync();
-
- assertEquals("PONG of hello", singleRecord.value().getMessage());
- verify(client, times(4)).ping(); // 3 retry + 1
- }
+ @Test
+ void singleMessageWithRetry() {
+ // response "PONG"
+ when(client.ping())
+ .thenThrow(mock(RetryableException.class))
+ .thenThrow(mock(RetryableException.class))
+ .thenThrow(mock(RetryableException.class))
+ .thenReturn("PONG");
+
+ producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
+ Ping.newBuilder().setMessage("hello").build()));
+ producer.flush();
+
+ ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer,
+ kStreamsProcessorConfig.output().topic().get(),
+ Durations.TEN_SECONDS);
+ consumer.commitSync();
+
+ assertEquals("PONG of hello", singleRecord.value().getMessage());
+ verify(client, times(4)).ping(); // 3 retry + 1
+ }
}
diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java
index 941a82f..38d72f5 100644
--- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java
+++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java
@@ -86,18 +86,19 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}
- @Test
- void singleMessageWithoutRetry() {
- // throw Exception
- when(client.ping())
- .thenThrow(mock(RuntimeException.class));
-
- producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
- producer.flush();
-
- assertThrows(IllegalStateException.class, () -> {
- KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
- Durations.TEN_SECONDS);
- });
- }
+ @Test
+ void singleMessageWithoutRetry() {
+ // throw Exception
+ when(client.ping())
+ .thenThrow(mock(RuntimeException.class));
+
+ producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
+ Ping.newBuilder().setMessage("hello").build()));
+ producer.flush();
+
+ assertThrows(IllegalStateException.class, () -> {
+ KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
+ Durations.TEN_SECONDS);
+ });
+ }
}
diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java
index c408af1..fadcfbf 100644
--- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java
+++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java
@@ -61,14 +61,14 @@ public void setup() {
processor.init(context);
}
- @Test
- public void replies_with_pong() {
- when(client.ping()).thenReturn("world");
- when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));
+ @Test
+ public void replies_with_pong() {
+ when(client.ping()).thenReturn("world");
+ when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));
- processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));
+ processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));
- verify(context).forward(captor.capture());
- assertEquals("world of hello", captor.getValue().value().getMessage());
- }
+ verify(context).forward(captor.capture());
+ assertEquals("world of hello", captor.getValue().value().getMessage());
+ }
}
diff --git a/pom.xml b/pom.xml
index 284b435..37c54d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse
quarkiverse-parent
- 16
+ 18
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
@@ -32,9 +32,9 @@
17
UTF-8
UTF-8
- 3.10.0
+ 3.15.1
- 3.24.1
+ 3.25.5
diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java
index 4ba357e..4f1e7bc 100644
--- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java
+++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java
@@ -27,15 +27,15 @@ class KStreamsProcessorConfigGeneratorTest {
@Mock
Config config;
- private void mockProperties(Map keyValues) {
- when(config.getPropertyNames()).thenReturn(keyValues.keySet());
- keyValues.forEach((k, v) -> {
- lenient().when(config.getValue(k, String.class)).thenReturn(v);
- lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v));
- lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(",")));
- lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(","))));
- });
- }
+ private void mockProperties(Map keyValues) {
+ when(config.getPropertyNames()).thenReturn(keyValues.keySet());
+ keyValues.forEach((k, v) -> {
+ lenient().when(config.getValue(k, String.class)).thenReturn(v);
+ lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v));
+ lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(",")));
+ lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(","))));
+ });
+ }
@Test
void singleSink() {
diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java
index 7619b03..76fe0aa 100644
--- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java
+++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java
@@ -23,15 +23,15 @@ public class SinkToTopicMappingBuilderFromConfigTest {
@Mock
Config config;
- private void mockProperties(Map keyValues) {
- when(config.getPropertyNames()).thenReturn(keyValues.keySet());
- keyValues.forEach((k, v) -> {
- lenient().when(config.getValue(k, String.class)).thenReturn(v);
- lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v));
- lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(",")));
- lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(","))));
- });
- }
+ private void mockProperties(Map keyValues) {
+ when(config.getPropertyNames()).thenReturn(keyValues.keySet());
+ keyValues.forEach((k, v) -> {
+ lenient().when(config.getValue(k, String.class)).thenReturn(v);
+ lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v));
+ lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(",")));
+ lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(","))));
+ });
+ }
@Test
void sinkToTopicMapping_whenSingleSink_shouldGenerateMapping() {
diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java
index 5358a81..07a3a26 100644
--- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java
+++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java
@@ -62,9 +62,9 @@ private void configureSink(String name, String topic) {
sinks.put(name, sinkConfig);
}
- private void configureTopic(String topic) {
- when(outputConfig.topic()).thenReturn(Optional.of(topic));
- }
+ private void configureTopic(String topic) {
+ when(outputConfig.topic()).thenReturn(Optional.of(topic));
+ }
@Test
void sinkToTopicMapping_whenSingleSink_shouldGenerateMapping() {
diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java
index c5d425a..329ef83 100644
--- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java
+++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java
@@ -27,15 +27,15 @@ public class SourceToTopicsMappingBuilderFromConfigTest {
@Mock
Config config;
- private void mockProperties(Map keyValues) {
- when(config.getPropertyNames()).thenReturn(keyValues.keySet());
- keyValues.forEach((k, v) -> {
- lenient().when(config.getValue(k, String.class)).thenReturn(v);
- lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v));
- lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(",")));
- lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(","))));
- });
- }
+ private void mockProperties(Map keyValues) {
+ when(config.getPropertyNames()).thenReturn(keyValues.keySet());
+ keyValues.forEach((k, v) -> {
+ lenient().when(config.getValue(k, String.class)).thenReturn(v);
+ lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v));
+ lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(",")));
+ lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(","))));
+ });
+ }
@Test
void sourceToTopicMapping_whenSingleSource_shouldGenerateMapping() {
diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java
index b75c60e..3114f5c 100644
--- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java
+++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java
@@ -63,7 +63,7 @@ private void configureSource(String name, String... topics) {
sources.put(name, sourceConfig);
}
- private void configureTopics(String... topics) {
+ private void configureTopics(String... topics) {
when(inputConfig.topics()).thenReturn(Optional.of(Arrays.asList(topics)));
}
diff --git a/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java b/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java
index d9ffde8..6e8af08 100644
--- a/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java
+++ b/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java
@@ -51,10 +51,10 @@ class QuarkusIntegrationCompatibleKafkaDevServicesResourceTest {
QuarkusTestResourceLifecycleManager.TestInjector testInjector = new DefaultTestInjectorCopy(testInstance);
- @BeforeEach
- void setUp() {
- when(context.devServicesProperties()).thenReturn(devServiceProps);
- }
+ @BeforeEach
+ void setUp() {
+ when(context.devServicesProperties()).thenReturn(devServiceProps);
+ }
@Test
public void noKafkaBootstrapServersForwardedSoNothingSet() {