Skip to content

Commit

Permalink
GH-2246: Partition attribute SpEL expression
Browse files Browse the repository at this point in the history
Fixes: gh-2246

When partition attribute is specified as a SpEL expression on PartitionOffset
annotation, it doesn not resolve the value from the expression. Fixing this issue.

Adding test to verify.

**Cherry-pick to 3.0.x**
  • Loading branch information
tsalso authored and sobychacko committed Dec 14, 2023
1 parent 3314e79 commit 654c531
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
}
if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
if (partitionOffsets.length == 1 && resolveExpression(partitionOffsets[0].partition()).equals("*")) {
result.forEach(tpo -> {
tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0]));
tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@
* @author Venil Noronha
* @author Dimitri Penner
* @author Nakul Mishra
* @author Soby Chacko
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3",
@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3", "annotated3x",
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated8reply",
"annotated9", "annotated10",
"annotated11", "annotated12", "annotated13", "annotated14", "annotated15", "annotated16", "annotated17",
Expand Down Expand Up @@ -311,6 +312,10 @@ public void manyTests() throws Exception {
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
assertThat(this.config.listen3Exception).isNotNull();

template.send("annotated3x", 0, "foo");
assertThat(this.listener.latch3x.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");

template.send("annotated4", 0, "foo");
assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
Expand Down Expand Up @@ -1832,6 +1837,8 @@ static class Listener implements ConsumerSeekAware {

final CountDownLatch latch3 = new CountDownLatch(1);

final CountDownLatch latch3x = new CountDownLatch(1);

final CountDownLatch latch4 = new CountDownLatch(1);

final CountDownLatch latch5 = new CountDownLatch(1);
Expand Down Expand Up @@ -2010,6 +2017,14 @@ public void listen3(ConsumerRecord<?, ?> record) {
this.latch3.countDown();
}

@KafkaListener(id = "partitionExpression", topicPartitions = @TopicPartition(topic = "${topicThree:annotated3x}",
partitions = "${zero:0}",
partitionOffsets = @PartitionOffset(partition = "#{'*'}", initialOffset = "0")))
public void listenPartitionSpelExpression(ConsumerRecord<?, ?> record) {
this.capturedRecord = record;
this.latch3x.countDown();
}

@KafkaListener(id = "#{'qux'}", topics = "annotated4",
containerFactory = "kafkaManualAckListenerContainerFactory", containerGroup = "qux#{'Group'}",
properties = {
Expand Down

0 comments on commit 654c531

Please sign in to comment.