Skip to content

Commit

Permalink
add consuming from dead letter topic to java
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 19, 2024
1 parent 2e2034b commit 68191b5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
5 changes: 3 additions & 2 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func TestPubSub(t *testing.T) {
func TestRetry(t *testing.T) {
retriesPerCall := 2
in.Run(t,
in.WithLanguages("java", "go"),
// in.WithLanguages("java", "go"),
in.WithLanguages("java"),

in.WithPubSub(),
in.CopyModule("publisher"),
Expand All @@ -75,7 +76,7 @@ func TestRetry(t *testing.T) {
checkPublished("subscriber", "consumeButFailAndRetryFailed", 2),

// jvm does not allow subscribing to dead letter topics yet
in.IfLanguage("go", checkConsumed("subscriber", "consumeFromDeadLetter", true, 2, optional.None[string]())),
checkConsumed("subscriber", "consumeFromDeadLetter", true, 2, optional.None[string]()),
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package xyz.block.ftl.java.test.subscriber;

import ftl.builtin.FailedEvent;
import ftl.publisher.PubSubEvent;
import ftl.publisher.TestTopicTopic;
import ftl.publisher.Topic2Topic;
import io.quarkus.logging.Log;
import xyz.block.ftl.FromOffset;
import xyz.block.ftl.Retry;
import xyz.block.ftl.Subscription;
import xyz.block.ftl.Topic;
import xyz.block.ftl.TopicPartitionMapper;
import xyz.block.ftl.WriteableTopic;

class PartitionMapper implements TopicPartitionMapper<PubSubEvent> {
public String getPartitionKey(PubSubEvent event) {
return event.getTime().toString();
}
}

public class Subscriber {

Expand All @@ -20,9 +30,20 @@ void consumeFromLatest(PubSubEvent event) throws Exception {
Log.infof("consumeFromLatest: %s", event.getTime());
}

// Java requires the topic to be explicitly defined as an interface for consuming to work
@Topic("consumeButFailAndRetryFailed")
interface ConsumeButFailAndRetryFailedTopic extends WriteableTopic<PubSubEvent, PartitionMapper> {

}

@Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING, deadLetter = true)
@Retry(count = 2, minBackoff = "1s", maxBackoff = "1s")
public void consumeButFailAndRetry(PubSubEvent event) {
throw new RuntimeException("always error: event " + event.getTime());
}

@Subscription(topic = ConsumeButFailAndRetryFailedTopic.class, from = FromOffset.BEGINNING)
public void consumeFromDeadLetter(builtin.FailedEvent<PubSubEvent> event) {
throw new RuntimeException("always error: event " + event.getEvent().getTime());
}
}

0 comments on commit 68191b5

Please sign in to comment.