Skip to content

Commit

Permalink
feat(pom): Update Quarkus to 3.15.1
Browse files Browse the repository at this point in the history
Related dependencies have been updated as well like restassured and kafka.
Protobuf as well.

Fixes #113

Moving to quarkiverse-parent version 18, impacts on formatting of Java files.
  • Loading branch information
edeweerd1A committed Oct 11, 2024
1 parent 5eab6d5 commit 32bd545
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 151 deletions.
6 changes: 3 additions & 3 deletions bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<packaging>pom</packaging>
<properties>
<kafka.version>3.8.0</kafka.version>
<kafka.version>3.7.1</kafka.version>
<org.mock-server.version>5.15.0</org.mock-server.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -43,12 +43,12 @@
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>json-path</artifactId>
<version>5.4.0</version>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>3.1.3</version>
<version>3.2.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private TopologyProducer newTopologyProducer(
when(dlqConfig.topic()).thenReturn(Optional.ofNullable(dlq));
when(sourceToTopicsMappingBuilder.sourceToTopicsMapping()).thenReturn(sourceToTopicMapping);
when(sinkToTopicMappingBuilder.sinkToTopicMapping()).thenReturn(sinkToTopicMapping);
TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer, sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer,
sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
return topologyProducer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void setup() {
customizer = new DefaultTopologySerdesConfiguration(objectMapper);
}

@Test
@Test
void shouldSetIntrospectionSerializerByDefault() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
when(configuration.getSinkValueSerializer()).thenReturn(null);
Expand All @@ -74,16 +74,16 @@ void shouldSetCustomizedSinkSerializerAndValueSerde() {
assertThat(configuration.getSourceValueSerde(), is(equalTo(customSerde)));
}

@Test
@Test
void shouldSetProtobufSerdeWhenProcessorIsProtobuf() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) PingMessage.Ping.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(KafkaProtobufSerde.class));
}

@Test
@Test
void shouldSetJacksonSerdeWhenProcessorIsAPojo() {
when(configuration.getProcessorPayloadType()).thenReturn((Class)JSonPojo.class);
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(JacksonSerde.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class CdiRequestContextDecoratorTest {
@Mock
ManagedContext requestContext;

@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
decorator = new CdiRequestContextDecorator<>(processor, container);
}
@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
decorator = new CdiRequestContextDecorator<>(processor, container);
}

@Test
public void shouldActivateDeactivateWhenProcessIsCalled() {
Expand All @@ -66,10 +66,10 @@ public void shouldActivateDeactivateWhenProcessIsCalled() {
verify(requestContext).terminate();
}

@Test
public void shouldNotActivateRequestScopeIfAlreadyActivated() {
when(requestContext.isActive()).thenReturn(true);
decorator.process(new Record<>("Hello", "World", 0L));
verify(requestContext, never()).activate();
}
@Test
public void shouldNotActivateRequestScopeIfAlreadyActivated() {
when(requestContext.isActive()).thenReturn(true);
decorator.process(new Record<>("Hello", "World", 0L));
verify(requestContext, never()).activate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PunctuatorDecorationProcessorDecoratorTest {
@Mock
InternalProcessorContext<String, PingMessage.Ping> context;

@Test
@Test
void contextWrapped() {
when(decoratedPunctuators.get()).thenReturn(decoratedPunctuator);
Processor<String, PingMessage.Ping, String, PingMessage.Ping> processor = new Processor<>() {
Expand All @@ -68,14 +68,14 @@ public void process(Record<String, PingMessage.Ping> record) {
}
};

PunctuatorDecorationProcessorDecorator<String, PingMessage.Ping, String, PingMessage.Ping> decorator = new PunctuatorDecorationProcessorDecorator<>(
processor,
decoratedPunctuators);
decorator.init(context);
decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null));
decorator.close();
PunctuatorDecorationProcessorDecorator<String, PingMessage.Ping, String, PingMessage.Ping> decorator = new PunctuatorDecorationProcessorDecorator<>(
processor,
decoratedPunctuators);
decorator.init(context);
decorator.process(new Record<>("blabla", PingMessage.Ping.newBuilder().setMessage("blabla").build(), 0L, null));
decorator.close();

verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(decoratedPunctuator).setRealPunctuatorInstance(punctuator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,37 @@ class LogAndSendToDlqExceptionHandlerDelegateTest {

@BeforeEach
void setUp() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
}

@Test
void shouldNotBlockAndSendToDlqIfPossible() {
when(record.key()).thenReturn(RECORD_KEY_BYTES);
when(record.value()).thenReturn(RECORD_VALUE_BYTES);
when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
when(record.topic()).thenReturn(INPUT_TOPIC);
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
assertEquals(DeserializationHandlerResponse.CONTINUE, response);

verify(dlqProducerMock)
.send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
headersWithMetadata)), any(Callback.class));
verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}
@Test
void shouldNotBlockAndSendToDlqIfPossible() {
when(record.key()).thenReturn(RECORD_KEY_BYTES);
when(record.value()).thenReturn(RECORD_VALUE_BYTES);
when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
when(record.topic()).thenReturn(INPUT_TOPIC);
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
assertEquals(DeserializationHandlerResponse.CONTINUE, response);

verify(dlqProducerMock)
.send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
headersWithMetadata)), any(Callback.class));
verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}

@Test
void shouldOnlyContinueIfDefaultErrorStrategy() {
Expand All @@ -136,7 +136,7 @@ void shouldOnlyContinueIfDefaultErrorStrategy() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -152,7 +152,7 @@ void shouldFailFastIfDlqStrategyWithoutTopic() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig);

assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public void setUp() {
healthCheck.kafkaStreams = streams;
}

@Test
public void testStateRunningHealthOK() {
when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
}
@Test
public void testStateRunningHealthOK() {
when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
}

@Test
public void testUndefinedStateHealthKO() {
Expand All @@ -60,13 +60,13 @@ public void testUndefinedStateHealthKO() {
assertThat(response.getData().get().get("state"), nullValue());
}

@Test
public void testStateRebalancingHealthKO() {
when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
}
@Test
public void testStateRebalancingHealthKO() {
when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
}

@Test
public void testStateKOIfKakfaStreamsNotInjected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,26 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}

@Test
void singleMessageWithRetry() {
// response "PONG"
when(client.ping())
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenReturn("PONG");

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
producer.flush();

ConsumerRecord<String, Ping> singleRecord = KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
consumer.commitSync();

assertEquals("PONG of hello", singleRecord.value().getMessage());
verify(client, times(4)).ping(); // 3 retry + 1
}
@Test
void singleMessageWithRetry() {
// response "PONG"
when(client.ping())
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenReturn("PONG");

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
Ping.newBuilder().setMessage("hello").build()));
producer.flush();

ConsumerRecord<String, Ping> singleRecord = KafkaTestUtils.getSingleRecord(consumer,
kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
consumer.commitSync();

assertEquals("PONG of hello", singleRecord.value().getMessage());
verify(client, times(4)).ping(); // 3 retry + 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,19 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}

@Test
void singleMessageWithoutRetry() {
// throw Exception
when(client.ping())
.thenThrow(mock(RuntimeException.class));

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
producer.flush();

assertThrows(IllegalStateException.class, () -> {
KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
});
}
@Test
void singleMessageWithoutRetry() {
// throw Exception
when(client.ping())
.thenThrow(mock(RuntimeException.class));

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
Ping.newBuilder().setMessage("hello").build()));
producer.flush();

assertThrows(IllegalStateException.class, () -> {
KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public void setup() {
processor.init(context);
}

@Test
public void replies_with_pong() {
when(client.ping()).thenReturn("world");
when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));
@Test
public void replies_with_pong() {
when(client.ping()).thenReturn("world");
when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));

processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));
processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));

verify(context).forward(captor.capture());
assertEquals("world of hello", captor.getValue().value().getMessage());
}
verify(context).forward(captor.capture());
assertEquals("world of hello", captor.getValue().value().getMessage());
}
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse</groupId>
<artifactId>quarkiverse-parent</artifactId>
<version>16</version>
<version>18</version>
</parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
Expand Down Expand Up @@ -32,9 +32,9 @@
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.version>3.10.0</quarkus.version>
<quarkus.version>3.15.1</quarkus.version>

<protobuf.version>3.24.1</protobuf.version>
<protobuf.version>3.25.5</protobuf.version>
</properties>

<repositories>
Expand Down
Loading

0 comments on commit 32bd545

Please sign in to comment.