From bb111f00f24b6d5e4331b0308c83c244ecf784fe Mon Sep 17 00:00:00 2001 From: edeweerd Date: Thu, 3 Oct 2024 16:11:25 +0200 Subject: [PATCH] feat(pom): Update Quarkus to 3.15.1 Related dependencies have been updated as well like restassured and kafka. Protobuf as well. Fixes #113 Moving to quarkiverse-parent version 18, impacts on formatting of Java files. --- bom/test/pom.xml | 6 +- .../impl/TopologyProducerTest.java | 3 +- ...efaultTopologySerdesConfigurationTest.java | 8 +-- .../CdiRequestContextDecoratorTest.java | 22 +++---- ...uatorDecorationProcessorDecoratorTest.java | 16 ++--- ...SendToDlqExceptionHandlerDelegateTest.java | 62 +++++++++---------- .../RebalancingTopicHealthCheckTest.java | 28 ++++----- ...ngClientProcessorQuarkusWithRetryTest.java | 40 ++++++------ ...ProcessorQuarkusWithoutRetryCatchTest.java | 29 ++++----- .../kafkatorest/PingClientProcessorTest.java | 16 ++--- pom.xml | 6 +- .../KStreamsProcessorConfigGeneratorTest.java | 18 +++--- ...nkToTopicMappingBuilderFromConfigTest.java | 18 +++--- .../spi/SinkToTopicMappingBuilderTest.java | 6 +- ...eToTopicsMappingBuilderFromConfigTest.java | 18 +++--- .../spi/SourceToTopicsMappingBuilderTest.java | 2 +- ...ompatibleKafkaDevServicesResourceTest.java | 8 +-- 17 files changed, 155 insertions(+), 151 deletions(-) diff --git a/bom/test/pom.xml b/bom/test/pom.xml index 565d2e4..62d6ae3 100644 --- a/bom/test/pom.xml +++ b/bom/test/pom.xml @@ -9,7 +9,7 @@ quarkus-kafka-streams-processor-test-bom pom - 3.8.0 + 3.7.1 5.15.0 @@ -43,12 +43,12 @@ io.rest-assured json-path - 5.4.0 + 5.5.0 org.springframework.kafka spring-kafka-test - 3.1.3 + 3.2.4 org.apache.kafka diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java index 2e23da5..f7ecd9d 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducerTest.java @@ -115,7 +115,8 @@ private TopologyProducer newTopologyProducer( when(dlqConfig.topic()).thenReturn(Optional.ofNullable(dlq)); when(sourceToTopicsMappingBuilder.sourceToTopicsMapping()).thenReturn(sourceToTopicMapping); when(sinkToTopicMappingBuilder.sinkToTopicMapping()).thenReturn(sinkToTopicMapping); - TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer, sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors); + TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer, + sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors); return topologyProducer; } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java index 13e06ec..7fa0b06 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/configuration/DefaultTopologySerdesConfigurationTest.java @@ -55,7 +55,7 @@ void setup() { customizer = new DefaultTopologySerdesConfiguration(objectMapper); } - @Test + @Test void shouldSetIntrospectionSerializerByDefault() { when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class); when(configuration.getSinkValueSerializer()).thenReturn(null); @@ -74,16 +74,16 @@ void shouldSetCustomizedSinkSerializerAndValueSerde() { assertThat(configuration.getSourceValueSerde(), is(equalTo(customSerde))); } - @Test + @Test void shouldSetProtobufSerdeWhenProcessorIsProtobuf() { when(configuration.getProcessorPayloadType()).thenReturn((Class) PingMessage.Ping.class); customizer.apply(configuration); verify(configuration).setSourceValueSerde(isA(KafkaProtobufSerde.class)); } - @Test + @Test void shouldSetJacksonSerdeWhenProcessorIsAPojo() { - when(configuration.getProcessorPayloadType()).thenReturn((Class)JSonPojo.class); + when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class); customizer.apply(configuration); verify(configuration).setSourceValueSerde(isA(JacksonSerde.class)); } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java index 3264e55..1f371a8 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java @@ -52,11 +52,11 @@ class CdiRequestContextDecoratorTest { @Mock ManagedContext requestContext; - @BeforeEach - public void setup() { - when(container.requestContext()).thenReturn(requestContext); - decorator = new CdiRequestContextDecorator<>(processor, container); - } + @BeforeEach + public void setup() { + when(container.requestContext()).thenReturn(requestContext); + decorator = new CdiRequestContextDecorator<>(processor, container); + } @Test public void shouldActivateDeactivateWhenProcessIsCalled() { @@ -66,10 +66,10 @@ public void shouldActivateDeactivateWhenProcessIsCalled() { verify(requestContext).terminate(); } - @Test - public void shouldNotActivateRequestScopeIfAlreadyActivated() { - when(requestContext.isActive()).thenReturn(true); - decorator.process(new Record<>("Hello", "World", 0L)); - verify(requestContext, never()).activate(); - } + @Test + public void shouldNotActivateRequestScopeIfAlreadyActivated() { + when(requestContext.isActive()).thenReturn(true); + decorator.process(new Record<>("Hello", "World", 0L)); + verify(requestContext, never()).activate(); + } } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java index 9a5ffa5..5187037 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java @@ -53,7 +53,7 @@ class PunctuatorDecorationProcessorDecoratorTest { @Mock InternalProcessorContext context; - @Test + @Test void contextWrapped() { when(decoratedPunctuators.get()).thenReturn(decoratedPunctuator); Processor processor = new Processor<>() { @@ -68,14 +68,14 @@ public void process(Record record) { } }; - PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>( - processor, - decoratedPunctuators); - decorator.init(context); - decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null)); - decorator.close(); + PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>( + processor, + decoratedPunctuators); + decorator.init(context); + decorator.process(new Record<>("blabla", PingMessage.Ping.newBuilder().setMessage("blabla").build(), 0L, null)); + decorator.close(); - verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator); + verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator); verify(decoratedPunctuator).setRealPunctuatorInstance(punctuator); } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java index e0a8c5b..40ffee6 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java @@ -97,37 +97,37 @@ class LogAndSendToDlqExceptionHandlerDelegateTest { @BeforeEach void setUp() { - when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig); + when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig); } - @Test - void shouldNotBlockAndSendToDlqIfPossible() { - when(record.key()).thenReturn(RECORD_KEY_BYTES); - when(record.value()).thenReturn(RECORD_VALUE_BYTES); - when(record.timestamp()).thenReturn(RECORD_TIMESTAMP); - when(record.topic()).thenReturn(INPUT_TOPIC); - when(record.partition()).thenReturn(PARTITION); - when(record.headers()).thenReturn(headers); - when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC)); - when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE); - RecordHeaders headersWithMetadata = new RecordHeaders(); - when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class))) - .thenReturn(headersWithMetadata); - when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock); - - handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, - kStreamsProcessorConfig); - handler.configure(Collections.emptyMap()); - - DeserializationHandlerResponse response = handler.handle(context, record, exception); - assertEquals(DeserializationHandlerResponse.CONTINUE, response); - - verify(dlqProducerMock) - .send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES, - headersWithMetadata)), any(Callback.class)); - verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception)); - assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d)); - } + @Test + void shouldNotBlockAndSendToDlqIfPossible() { + when(record.key()).thenReturn(RECORD_KEY_BYTES); + when(record.value()).thenReturn(RECORD_VALUE_BYTES); + when(record.timestamp()).thenReturn(RECORD_TIMESTAMP); + when(record.topic()).thenReturn(INPUT_TOPIC); + when(record.partition()).thenReturn(PARTITION); + when(record.headers()).thenReturn(headers); + when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC)); + when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE); + RecordHeaders headersWithMetadata = new RecordHeaders(); + when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class))) + .thenReturn(headersWithMetadata); + when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock); + + handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, + kStreamsProcessorConfig); + handler.configure(Collections.emptyMap()); + + DeserializationHandlerResponse response = handler.handle(context, record, exception); + assertEquals(DeserializationHandlerResponse.CONTINUE, response); + + verify(dlqProducerMock) + .send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES, + headersWithMetadata)), any(Callback.class)); + verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception)); + assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d)); + } @Test void shouldOnlyContinueIfDefaultErrorStrategy() { @@ -136,7 +136,7 @@ void shouldOnlyContinueIfDefaultErrorStrategy() { when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig); when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC)); handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, - kStreamsProcessorConfig); + kStreamsProcessorConfig); handler.configure(Collections.emptyMap()); DeserializationHandlerResponse response = handler.handle(context, record, exception); @@ -152,7 +152,7 @@ void shouldFailFastIfDlqStrategyWithoutTopic() { when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig); when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE); handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, - kStreamsProcessorConfig); + kStreamsProcessorConfig); assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap())); } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java index 933a8cc..4b9a4dc 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/health/RebalancingTopicHealthCheckTest.java @@ -45,13 +45,13 @@ public void setUp() { healthCheck.kafkaStreams = streams; } - @Test - public void testStateRunningHealthOK() { - when(streams.state()).thenReturn(KafkaStreams.State.RUNNING); - HealthCheckResponse response = healthCheck.call(); - assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP)); - assertThat(response.getData().get().get("state"), equalTo("RUNNING")); - } + @Test + public void testStateRunningHealthOK() { + when(streams.state()).thenReturn(KafkaStreams.State.RUNNING); + HealthCheckResponse response = healthCheck.call(); + assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP)); + assertThat(response.getData().get().get("state"), equalTo("RUNNING")); + } @Test public void testUndefinedStateHealthKO() { @@ -60,13 +60,13 @@ public void testUndefinedStateHealthKO() { assertThat(response.getData().get().get("state"), nullValue()); } - @Test - public void testStateRebalancingHealthKO() { - when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING); - HealthCheckResponse response = healthCheck.call(); - assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN)); - assertThat(response.getData().get().get("state"), equalTo("REBALANCING")); - } + @Test + public void testStateRebalancingHealthKO() { + when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING); + HealthCheckResponse response = healthCheck.call(); + assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN)); + assertThat(response.getData().get().get("state"), equalTo("REBALANCING")); + } @Test public void testStateKOIfKakfaStreamsNotInjected() { diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java index 1bf213c..8fec6bf 100644 --- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java +++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithRetryTest.java @@ -90,24 +90,26 @@ public void tearDown() { consumer.close(Durations.ONE_SECOND); } - @Test - void singleMessageWithRetry() { - // response "PONG" - when(client.ping()) - .thenThrow(mock(RetryableException.class)) - .thenThrow(mock(RetryableException.class)) - .thenThrow(mock(RetryableException.class)) - .thenReturn("PONG"); - - producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build())); - producer.flush(); - - ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(), - Durations.TEN_SECONDS); - consumer.commitSync(); - - assertEquals("PONG of hello", singleRecord.value().getMessage()); - verify(client, times(4)).ping(); // 3 retry + 1 - } + @Test + void singleMessageWithRetry() { + // response "PONG" + when(client.ping()) + .thenThrow(mock(RetryableException.class)) + .thenThrow(mock(RetryableException.class)) + .thenThrow(mock(RetryableException.class)) + .thenReturn("PONG"); + + producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), + Ping.newBuilder().setMessage("hello").build())); + producer.flush(); + + ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, + kStreamsProcessorConfig.output().topic().get(), + Durations.TEN_SECONDS); + consumer.commitSync(); + + assertEquals("PONG of hello", singleRecord.value().getMessage()); + verify(client, times(4)).ping(); // 3 retry + 1 + } } diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java index 941a82f..38d72f5 100644 --- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java +++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorQuarkusWithoutRetryCatchTest.java @@ -86,18 +86,19 @@ public void tearDown() { consumer.close(Durations.ONE_SECOND); } - @Test - void singleMessageWithoutRetry() { - // throw Exception - when(client.ping()) - .thenThrow(mock(RuntimeException.class)); - - producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build())); - producer.flush(); - - assertThrows(IllegalStateException.class, () -> { - KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(), - Durations.TEN_SECONDS); - }); - } + @Test + void singleMessageWithoutRetry() { + // throw Exception + when(client.ping()) + .thenThrow(mock(RuntimeException.class)); + + producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), + Ping.newBuilder().setMessage("hello").build())); + producer.flush(); + + assertThrows(IllegalStateException.class, () -> { + KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(), + Durations.TEN_SECONDS); + }); + } } diff --git a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java index c408af1..fadcfbf 100644 --- a/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java +++ b/integration-tests/kafka-to-rest/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/kafkatorest/PingClientProcessorTest.java @@ -61,14 +61,14 @@ public void setup() { processor.init(context); } - @Test - public void replies_with_pong() { - when(client.ping()).thenReturn("world"); - when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata)); + @Test + public void replies_with_pong() { + when(client.ping()).thenReturn("world"); + when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata)); - processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L)); + processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L)); - verify(context).forward(captor.capture()); - assertEquals("world of hello", captor.getValue().value().getMessage()); - } + verify(context).forward(captor.capture()); + assertEquals("world of hello", captor.getValue().value().getMessage()); + } } diff --git a/pom.xml b/pom.xml index 284b435..37c54d4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.quarkiverse quarkiverse-parent - 16 + 18 io.quarkiverse.kafkastreamsprocessor quarkus-kafka-streams-processor-parent @@ -32,9 +32,9 @@ 17 UTF-8 UTF-8 - 3.10.0 + 3.15.1 - 3.24.1 + 3.25.5 diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java index 4ba357e..4f1e7bc 100644 --- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java +++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/KStreamsProcessorConfigGeneratorTest.java @@ -27,15 +27,15 @@ class KStreamsProcessorConfigGeneratorTest { @Mock Config config; - private void mockProperties(Map keyValues) { - when(config.getPropertyNames()).thenReturn(keyValues.keySet()); - keyValues.forEach((k, v) -> { - lenient().when(config.getValue(k, String.class)).thenReturn(v); - lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v)); - lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(","))); - lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(",")))); - }); - } + private void mockProperties(Map keyValues) { + when(config.getPropertyNames()).thenReturn(keyValues.keySet()); + keyValues.forEach((k, v) -> { + lenient().when(config.getValue(k, String.class)).thenReturn(v); + lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v)); + lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(","))); + lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(",")))); + }); + } @Test void singleSink() { diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java index 7619b03..76fe0aa 100644 --- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java +++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderFromConfigTest.java @@ -23,15 +23,15 @@ public class SinkToTopicMappingBuilderFromConfigTest { @Mock Config config; - private void mockProperties(Map keyValues) { - when(config.getPropertyNames()).thenReturn(keyValues.keySet()); - keyValues.forEach((k, v) -> { - lenient().when(config.getValue(k, String.class)).thenReturn(v); - lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v)); - lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(","))); - lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(",")))); - }); - } + private void mockProperties(Map keyValues) { + when(config.getPropertyNames()).thenReturn(keyValues.keySet()); + keyValues.forEach((k, v) -> { + lenient().when(config.getValue(k, String.class)).thenReturn(v); + lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v)); + lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(","))); + lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(",")))); + }); + } @Test void sinkToTopicMapping_whenSingleSink_shouldGenerateMapping() { diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java index 5358a81..07a3a26 100644 --- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java +++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SinkToTopicMappingBuilderTest.java @@ -62,9 +62,9 @@ private void configureSink(String name, String topic) { sinks.put(name, sinkConfig); } - private void configureTopic(String topic) { - when(outputConfig.topic()).thenReturn(Optional.of(topic)); - } + private void configureTopic(String topic) { + when(outputConfig.topic()).thenReturn(Optional.of(topic)); + } @Test void sinkToTopicMapping_whenSingleSink_shouldGenerateMapping() { diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java index c5d425a..329ef83 100644 --- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java +++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderFromConfigTest.java @@ -27,15 +27,15 @@ public class SourceToTopicsMappingBuilderFromConfigTest { @Mock Config config; - private void mockProperties(Map keyValues) { - when(config.getPropertyNames()).thenReturn(keyValues.keySet()); - keyValues.forEach((k, v) -> { - lenient().when(config.getValue(k, String.class)).thenReturn(v); - lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v)); - lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(","))); - lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(",")))); - }); - } + private void mockProperties(Map keyValues) { + when(config.getPropertyNames()).thenReturn(keyValues.keySet()); + keyValues.forEach((k, v) -> { + lenient().when(config.getValue(k, String.class)).thenReturn(v); + lenient().when(config.getOptionalValue(k, String.class)).thenReturn(Optional.of(v)); + lenient().when(config.getValues(k, String.class)).thenReturn(List.of(v.split(","))); + lenient().when(config.getOptionalValues(k, String.class)).thenReturn(Optional.of(List.of(v.split(",")))); + }); + } @Test void sourceToTopicMapping_whenSingleSource_shouldGenerateMapping() { diff --git a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java index b75c60e..3114f5c 100644 --- a/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java +++ b/spi/src/test/java/io/quarkiverse/kafkastreamsprocessor/spi/SourceToTopicsMappingBuilderTest.java @@ -63,7 +63,7 @@ private void configureSource(String name, String... topics) { sources.put(name, sourceConfig); } - private void configureTopics(String... topics) { + private void configureTopics(String... topics) { when(inputConfig.topics()).thenReturn(Optional.of(Arrays.asList(topics))); } diff --git a/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java b/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java index d9ffde8..6e8af08 100644 --- a/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java +++ b/test-framework/src/test/java/io/quarkiverse.kafkastreamsprocessor.testframework/QuarkusIntegrationCompatibleKafkaDevServicesResourceTest.java @@ -51,10 +51,10 @@ class QuarkusIntegrationCompatibleKafkaDevServicesResourceTest { QuarkusTestResourceLifecycleManager.TestInjector testInjector = new DefaultTestInjectorCopy(testInstance); - @BeforeEach - void setUp() { - when(context.devServicesProperties()).thenReturn(devServiceProps); - } + @BeforeEach + void setUp() { + when(context.devServicesProperties()).thenReturn(devServiceProps); + } @Test public void noKafkaBootstrapServersForwardedSoNothingSet() {