Skip to content

Commit

Permalink
feat(pom): Update Quarkus to 3.15.1
Browse files Browse the repository at this point in the history
Related dependencies have been updated as well like restassured and kafka.
Protobuf as well.

Fixes quarkiverse#113

Moving to quarkiverse-parent version 18, impacts on formatting of Java files.
  • Loading branch information
edeweerd1A committed Oct 3, 2024
1 parent 3ee5f8c commit 436f880
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 151 deletions.
6 changes: 3 additions & 3 deletions bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<packaging>pom</packaging>
<properties>
<kafka.version>3.6.2</kafka.version>
<kafka.version>3.7.1</kafka.version>
<org.mock-server.version>5.15.0</org.mock-server.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -43,12 +43,12 @@
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>json-path</artifactId>
<version>5.4.0</version>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>3.1.3</version>
<version>3.2.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private TopologyProducer newTopologyProducer(
when(dlqConfig.topic()).thenReturn(Optional.ofNullable(dlq));
when(sourceToTopicsMappingBuilder.sourceToTopicsMapping()).thenReturn(sourceToTopicMapping);
when(sinkToTopicMappingBuilder.sinkToTopicMapping()).thenReturn(sinkToTopicMapping);
TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer, sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer,
sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
return topologyProducer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void setup() {
customizer = new DefaultTopologySerdesConfiguration(objectMapper);
}

@Test
@Test
void shouldSetIntrospectionSerializerByDefault() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
when(configuration.getSinkValueSerializer()).thenReturn(null);
Expand All @@ -74,16 +74,16 @@ void shouldSetCustomizedSinkSerializerAndValueSerde() {
assertThat(configuration.getSourceValueSerde(), is(equalTo(customSerde)));
}

@Test
@Test
void shouldSetProtobufSerdeWhenProcessorIsProtobuf() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) PingMessage.Ping.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(KafkaProtobufSerde.class));
}

@Test
@Test
void shouldSetJacksonSerdeWhenProcessorIsAPojo() {
when(configuration.getProcessorPayloadType()).thenReturn((Class)JSonPojo.class);
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(JacksonSerde.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class CdiRequestContextDecoratorTest {
@Mock
ManagedContext requestContext;

@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
decorator = new CdiRequestContextDecorator<>(processor, container);
}
@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
decorator = new CdiRequestContextDecorator<>(processor, container);
}

@Test
public void shouldActivateDeactivateWhenProcessIsCalled() {
Expand All @@ -66,10 +66,10 @@ public void shouldActivateDeactivateWhenProcessIsCalled() {
verify(requestContext).terminate();
}

@Test
public void shouldNotActivateRequestScopeIfAlreadyActivated() {
when(requestContext.isActive()).thenReturn(true);
decorator.process(new Record<>("Hello", "World", 0L));
verify(requestContext, never()).activate();
}
@Test
public void shouldNotActivateRequestScopeIfAlreadyActivated() {
when(requestContext.isActive()).thenReturn(true);
decorator.process(new Record<>("Hello", "World", 0L));
verify(requestContext, never()).activate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PunctuatorDecorationProcessorDecoratorTest {
@Mock
InternalProcessorContext<String, PingMessage.Ping> context;

@Test
@Test
void contextWrapped() {
when(decoratedPunctuators.get()).thenReturn(decoratedPunctuator);
Processor<String, PingMessage.Ping, String, PingMessage.Ping> processor = new Processor<>() {
Expand All @@ -68,14 +68,14 @@ public void process(Record<String, PingMessage.Ping> record) {
}
};

PunctuatorDecorationProcessorDecorator<String, PingMessage.Ping, String, PingMessage.Ping> decorator = new PunctuatorDecorationProcessorDecorator<>(
processor,
decoratedPunctuators);
decorator.init(context);
decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null));
decorator.close();
PunctuatorDecorationProcessorDecorator<String, PingMessage.Ping, String, PingMessage.Ping> decorator = new PunctuatorDecorationProcessorDecorator<>(
processor,
decoratedPunctuators);
decorator.init(context);
decorator.process(new Record<>("blabla", PingMessage.Ping.newBuilder().setMessage("blabla").build(), 0L, null));
decorator.close();

verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(decoratedPunctuator).setRealPunctuatorInstance(punctuator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,37 @@ class LogAndSendToDlqExceptionHandlerDelegateTest {

@BeforeEach
void setUp() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
}

@Test
void shouldNotBlockAndSendToDlqIfPossible() {
when(record.key()).thenReturn(RECORD_KEY_BYTES);
when(record.value()).thenReturn(RECORD_VALUE_BYTES);
when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
when(record.topic()).thenReturn(INPUT_TOPIC);
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
assertEquals(DeserializationHandlerResponse.CONTINUE, response);

verify(dlqProducerMock)
.send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
headersWithMetadata)), any(Callback.class));
verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}
@Test
void shouldNotBlockAndSendToDlqIfPossible() {
when(record.key()).thenReturn(RECORD_KEY_BYTES);
when(record.value()).thenReturn(RECORD_VALUE_BYTES);
when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
when(record.topic()).thenReturn(INPUT_TOPIC);
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
assertEquals(DeserializationHandlerResponse.CONTINUE, response);

verify(dlqProducerMock)
.send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
headersWithMetadata)), any(Callback.class));
verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}

@Test
void shouldOnlyContinueIfDefaultErrorStrategy() {
Expand All @@ -136,7 +136,7 @@ void shouldOnlyContinueIfDefaultErrorStrategy() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -152,7 +152,7 @@ void shouldFailFastIfDlqStrategyWithoutTopic() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig);

assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public void setUp() {
healthCheck.kafkaStreams = streams;
}

@Test
public void testStateRunningHealthOK() {
when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
}
@Test
public void testStateRunningHealthOK() {
when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
}

@Test
public void testUndefinedStateHealthKO() {
Expand All @@ -60,13 +60,13 @@ public void testUndefinedStateHealthKO() {
assertThat(response.getData().get().get("state"), nullValue());
}

@Test
public void testStateRebalancingHealthKO() {
when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
}
@Test
public void testStateRebalancingHealthKO() {
when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
}

@Test
public void testStateKOIfKakfaStreamsNotInjected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,26 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}

@Test
void singleMessageWithRetry() {
// response "PONG"
when(client.ping())
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenReturn("PONG");

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
producer.flush();

ConsumerRecord<String, Ping> singleRecord = KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
consumer.commitSync();

assertEquals("PONG of hello", singleRecord.value().getMessage());
verify(client, times(4)).ping(); // 3 retry + 1
}
@Test
void singleMessageWithRetry() {
// response "PONG"
when(client.ping())
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenReturn("PONG");

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
Ping.newBuilder().setMessage("hello").build()));
producer.flush();

ConsumerRecord<String, Ping> singleRecord = KafkaTestUtils.getSingleRecord(consumer,
kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
consumer.commitSync();

assertEquals("PONG of hello", singleRecord.value().getMessage());
verify(client, times(4)).ping(); // 3 retry + 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,19 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}

@Test
void singleMessageWithoutRetry() {
// throw Exception
when(client.ping())
.thenThrow(mock(RuntimeException.class));

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
producer.flush();

assertThrows(IllegalStateException.class, () -> {
KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
});
}
@Test
void singleMessageWithoutRetry() {
// throw Exception
when(client.ping())
.thenThrow(mock(RuntimeException.class));

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
Ping.newBuilder().setMessage("hello").build()));
producer.flush();

assertThrows(IllegalStateException.class, () -> {
KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public void setup() {
processor.init(context);
}

@Test
public void replies_with_pong() {
when(client.ping()).thenReturn("world");
when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));
@Test
public void replies_with_pong() {
when(client.ping()).thenReturn("world");
when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));

processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));
processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));

verify(context).forward(captor.capture());
assertEquals("world of hello", captor.getValue().value().getMessage());
}
verify(context).forward(captor.capture());
assertEquals("world of hello", captor.getValue().value().getMessage());
}
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse</groupId>
<artifactId>quarkiverse-parent</artifactId>
<version>16</version>
<version>18</version>
</parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
Expand Down Expand Up @@ -32,9 +32,9 @@
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.version>3.10.0</quarkus.version>
<quarkus.version>3.15.1</quarkus.version>

<protobuf.version>3.24.1</protobuf.version>
<protobuf.version>3.25.5</protobuf.version>
</properties>

<repositories>
Expand Down
Loading

0 comments on commit 436f880

Please sign in to comment.