Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pom): Update Quarkus to 3.15.1 #114

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<packaging>pom</packaging>
<properties>
<kafka.version>3.8.0</kafka.version>
<kafka.version>3.7.1</kafka.version>
<org.mock-server.version>5.15.0</org.mock-server.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -43,12 +43,12 @@
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>json-path</artifactId>
<version>5.4.0</version>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>3.1.3</version>
<version>3.2.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private TopologyProducer newTopologyProducer(
when(dlqConfig.topic()).thenReturn(Optional.ofNullable(dlq));
when(sourceToTopicsMappingBuilder.sourceToTopicsMapping()).thenReturn(sourceToTopicMapping);
when(sinkToTopicMappingBuilder.sinkToTopicMapping()).thenReturn(sinkToTopicMapping);
TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer, sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
TopologyProducer topologyProducer = new TopologyProducer(kStreamsProcessorConfig, configCustomizer,
sourceToTopicsMappingBuilder, sinkToTopicMappingBuilder, interceptors);
return topologyProducer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void setup() {
customizer = new DefaultTopologySerdesConfiguration(objectMapper);
}

@Test
@Test
void shouldSetIntrospectionSerializerByDefault() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
when(configuration.getSinkValueSerializer()).thenReturn(null);
Expand All @@ -74,16 +74,16 @@ void shouldSetCustomizedSinkSerializerAndValueSerde() {
assertThat(configuration.getSourceValueSerde(), is(equalTo(customSerde)));
}

@Test
@Test
void shouldSetProtobufSerdeWhenProcessorIsProtobuf() {
when(configuration.getProcessorPayloadType()).thenReturn((Class) PingMessage.Ping.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(KafkaProtobufSerde.class));
}

@Test
@Test
void shouldSetJacksonSerdeWhenProcessorIsAPojo() {
when(configuration.getProcessorPayloadType()).thenReturn((Class)JSonPojo.class);
when(configuration.getProcessorPayloadType()).thenReturn((Class) JSonPojo.class);
customizer.apply(configuration);
verify(configuration).setSourceValueSerde(isA(JacksonSerde.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class CdiRequestContextDecoratorTest {
@Mock
ManagedContext requestContext;

@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
decorator = new CdiRequestContextDecorator<>(processor, container);
}
@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
decorator = new CdiRequestContextDecorator<>(processor, container);
}

@Test
public void shouldActivateDeactivateWhenProcessIsCalled() {
Expand All @@ -66,10 +66,10 @@ public void shouldActivateDeactivateWhenProcessIsCalled() {
verify(requestContext).terminate();
}

@Test
public void shouldNotActivateRequestScopeIfAlreadyActivated() {
when(requestContext.isActive()).thenReturn(true);
decorator.process(new Record<>("Hello", "World", 0L));
verify(requestContext, never()).activate();
}
@Test
public void shouldNotActivateRequestScopeIfAlreadyActivated() {
when(requestContext.isActive()).thenReturn(true);
decorator.process(new Record<>("Hello", "World", 0L));
verify(requestContext, never()).activate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PunctuatorDecorationProcessorDecoratorTest {
@Mock
InternalProcessorContext<String, PingMessage.Ping> context;

@Test
@Test
void contextWrapped() {
when(decoratedPunctuators.get()).thenReturn(decoratedPunctuator);
Processor<String, PingMessage.Ping, String, PingMessage.Ping> processor = new Processor<>() {
Expand All @@ -68,14 +68,14 @@ public void process(Record<String, PingMessage.Ping> record) {
}
};

PunctuatorDecorationProcessorDecorator<String, PingMessage.Ping, String, PingMessage.Ping> decorator = new PunctuatorDecorationProcessorDecorator<>(
processor,
decoratedPunctuators);
decorator.init(context);
decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null));
decorator.close();
PunctuatorDecorationProcessorDecorator<String, PingMessage.Ping, String, PingMessage.Ping> decorator = new PunctuatorDecorationProcessorDecorator<>(
processor,
decoratedPunctuators);
decorator.init(context);
decorator.process(new Record<>("blabla", PingMessage.Ping.newBuilder().setMessage("blabla").build(), 0L, null));
decorator.close();

verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(context).schedule(Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, decoratedPunctuator);
verify(decoratedPunctuator).setRealPunctuatorInstance(punctuator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,37 @@ class LogAndSendToDlqExceptionHandlerDelegateTest {

@BeforeEach
void setUp() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
}

@Test
void shouldNotBlockAndSendToDlqIfPossible() {
when(record.key()).thenReturn(RECORD_KEY_BYTES);
when(record.value()).thenReturn(RECORD_VALUE_BYTES);
when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
when(record.topic()).thenReturn(INPUT_TOPIC);
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
assertEquals(DeserializationHandlerResponse.CONTINUE, response);

verify(dlqProducerMock)
.send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
headersWithMetadata)), any(Callback.class));
verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}
@Test
void shouldNotBlockAndSendToDlqIfPossible() {
when(record.key()).thenReturn(RECORD_KEY_BYTES);
when(record.value()).thenReturn(RECORD_VALUE_BYTES);
when(record.timestamp()).thenReturn(RECORD_TIMESTAMP);
when(record.topic()).thenReturn(INPUT_TOPIC);
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
assertEquals(DeserializationHandlerResponse.CONTINUE, response);

verify(dlqProducerMock)
.send(eq(new ProducerRecord<>(DLQ_TOPIC, null, RECORD_TIMESTAMP, RECORD_KEY_BYTES, RECORD_VALUE_BYTES,
headersWithMetadata)), any(Callback.class));
verify(metadataHandler).withMetadata(eq(headers), eq(INPUT_TOPIC), eq(PARTITION), eq(exception));
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}

@Test
void shouldOnlyContinueIfDefaultErrorStrategy() {
Expand All @@ -136,7 +136,7 @@ void shouldOnlyContinueIfDefaultErrorStrategy() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -152,7 +152,7 @@ void shouldFailFastIfDlqStrategyWithoutTopic() {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig);

assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public void setUp() {
healthCheck.kafkaStreams = streams;
}

@Test
public void testStateRunningHealthOK() {
when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
}
@Test
public void testStateRunningHealthOK() {
when(streams.state()).thenReturn(KafkaStreams.State.RUNNING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.UP));
assertThat(response.getData().get().get("state"), equalTo("RUNNING"));
}

@Test
public void testUndefinedStateHealthKO() {
Expand All @@ -60,13 +60,13 @@ public void testUndefinedStateHealthKO() {
assertThat(response.getData().get().get("state"), nullValue());
}

@Test
public void testStateRebalancingHealthKO() {
when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
}
@Test
public void testStateRebalancingHealthKO() {
when(streams.state()).thenReturn(KafkaStreams.State.REBALANCING);
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus(), equalTo(HealthCheckResponse.Status.DOWN));
assertThat(response.getData().get().get("state"), equalTo("REBALANCING"));
}

@Test
public void testStateKOIfKakfaStreamsNotInjected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,26 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}

@Test
void singleMessageWithRetry() {
// response "PONG"
when(client.ping())
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenReturn("PONG");

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
producer.flush();

ConsumerRecord<String, Ping> singleRecord = KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
consumer.commitSync();

assertEquals("PONG of hello", singleRecord.value().getMessage());
verify(client, times(4)).ping(); // 3 retry + 1
}
@Test
void singleMessageWithRetry() {
// response "PONG"
when(client.ping())
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenThrow(mock(RetryableException.class))
.thenReturn("PONG");

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
Ping.newBuilder().setMessage("hello").build()));
producer.flush();

ConsumerRecord<String, Ping> singleRecord = KafkaTestUtils.getSingleRecord(consumer,
kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
consumer.commitSync();

assertEquals("PONG of hello", singleRecord.value().getMessage());
verify(client, times(4)).ping(); // 3 retry + 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,19 @@ public void tearDown() {
consumer.close(Durations.ONE_SECOND);
}

@Test
void singleMessageWithoutRetry() {
// throw Exception
when(client.ping())
.thenThrow(mock(RuntimeException.class));

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(), Ping.newBuilder().setMessage("hello").build()));
producer.flush();

assertThrows(IllegalStateException.class, () -> {
KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
});
}
@Test
void singleMessageWithoutRetry() {
// throw Exception
when(client.ping())
.thenThrow(mock(RuntimeException.class));

producer.send(new ProducerRecord<>(kStreamsProcessorConfig.input().topic().get(),
Ping.newBuilder().setMessage("hello").build()));
producer.flush();

assertThrows(IllegalStateException.class, () -> {
KafkaTestUtils.getSingleRecord(consumer, kStreamsProcessorConfig.output().topic().get(),
Durations.TEN_SECONDS);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public void setup() {
processor.init(context);
}

@Test
public void replies_with_pong() {
when(client.ping()).thenReturn("world");
when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));
@Test
public void replies_with_pong() {
when(client.ping()).thenReturn("world");
when(context.recordMetadata()).thenReturn(Optional.of(recordMetadata));

processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));
processor.process(new Record<>("key", PingMessage.Ping.newBuilder().setMessage("hello").build(), 0L));

verify(context).forward(captor.capture());
assertEquals("world of hello", captor.getValue().value().getMessage());
}
verify(context).forward(captor.capture());
assertEquals("world of hello", captor.getValue().value().getMessage());
}
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse</groupId>
<artifactId>quarkiverse-parent</artifactId>
<version>16</version>
<version>18</version>
</parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
Expand Down Expand Up @@ -32,9 +32,9 @@
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.version>3.10.0</quarkus.version>
<quarkus.version>3.15.1</quarkus.version>

<protobuf.version>3.24.1</protobuf.version>
<protobuf.version>3.25.5</protobuf.version>
</properties>

<repositories>
Expand Down
Loading
Loading