From 0543011d59dd6fd4fb46b38f1f1460a55c7d3ec3 Mon Sep 17 00:00:00 2001 From: Cassie Coyle Date: Thu, 14 Sep 2023 19:28:29 -0500 Subject: [PATCH] Updated enable dead letter topic (#913) * feat: Add dead letter topic to a subscription topic. Signed-off-by: Charlie Mk * feat: Add endpoint examples Signed-off-by: Charlie Mk * feat: Add dead letter topic example to the docs. Signed-off-by: Charlie Mk * Create new methods with deadLetterTopic instead of overload existing ones. Signed-off-by: Charlie Mk * Update _index.md Signed-off-by: Artur Souza * Create overloads for deadlettertopic. Signed-off-by: Artur Souza * fix integration tests from names overlapping Signed-off-by: Cassandra Coyle --------- Signed-off-by: Charlie Mk Signed-off-by: Artur Souza Signed-off-by: Cassandra Coyle Co-authored-by: Charlie Mk Co-authored-by: Artur Souza Co-authored-by: Artur Souza Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> Co-authored-by: Yaron Schneider --- daprdocs/content/en/java-sdk-docs/_index.md | 20 +++++ .../pubsub/http/SubscriberController.java | 31 ++++++- .../springboot/DaprBeanPostProcessor.java | 3 +- .../java/io/dapr/springboot/DaprRuntime.java | 83 +++++++++++++---- .../springboot/DaprSubscriptionBuilder.java | 53 ++++++++--- .../springboot/DaprTopicSubscription.java | 88 +++++++++++++++++-- .../DaprBeanPostProcessorSubscribeTest.java | 2 + .../io/dapr/springboot/DaprRuntimeTest.java | 10 ++- .../MockControllerWithSubscribe.java | 5 +- sdk/src/main/java/io/dapr/Topic.java | 6 ++ 10 files changed, 255 insertions(+), 46 deletions(-) diff --git a/daprdocs/content/en/java-sdk-docs/_index.md b/daprdocs/content/en/java-sdk-docs/_index.md index 47357792c..3982212af 100644 --- a/daprdocs/content/en/java-sdk-docs/_index.md +++ b/daprdocs/content/en/java-sdk-docs/_index.md @@ -183,6 +183,26 @@ public class SubscriberController { }); } + /** + * Handles a registered publish endpoint on this app adding a topic which manage to forward undeliverable messages. + * + * @param cloudEvent The cloud event received. + * @return A message containing the time. + */ + @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}", + deadLetterTopic = "${deadLetterProperty:deadTopic}") + @PostMapping(path = "/testingtopic") + public Mono handleMessageWithErrorHandler(@RequestBody(required = false) CloudEvent cloudEvent) { + return Mono.fromRunnable(() -> { + try { + System.out.println("Subscriber got: " + cloudEvent.getData()); + System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}", rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1)) @PostMapping(path = "/testingtopicV2") diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java index e096402de..c218d486c 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java @@ -39,9 +39,10 @@ public class SubscriberController { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + /** * Handles a registered publish endpoint on this app. - * + * * @param cloudEvent The cloud event received. * @return A message containing the time. */ @@ -58,6 +59,27 @@ public Mono handleMessage(@RequestBody(required = false) CloudEvent handleMessageWithErrorHandler(@RequestBody(required = false) CloudEvent cloudEvent) { + return Mono.fromRunnable(() -> { + try { + System.out.println("Subscriber got: " + cloudEvent.getData()); + System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + /** * Handles a registered publish endpoint on this app (version 2 of a cloud * event). @@ -66,6 +88,7 @@ public Mono handleMessage(@RequestBody(required = false) CloudEvent handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) { @@ -84,7 +107,8 @@ public Mono handleMessageV2(@RequestBody(required = false) CloudEvent clou * @param cloudEvent The cloud event received. * @return A message containing the time. */ - @Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}") + @Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}", + deadLetterTopic = "${deadLetterProperty:deadTopic}") @PostMapping(path = "/bulkpublishtesting") public Mono handleBulkPublishMessage(@RequestBody(required = false) CloudEvent cloudEvent) { return Mono.fromRunnable(() -> { @@ -104,7 +128,8 @@ public Mono handleBulkPublishMessage(@RequestBody(required = false) CloudE * @return A list of responses for each event. */ @BulkSubscribe() - @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}") + @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}", + deadLetterTopic = "${deadLetterProperty:deadTopic}") @PostMapping(path = "/testingtopicbulk") public Mono handleBulkMessage( @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java index 928102d21..b33181884 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java @@ -107,6 +107,7 @@ private static void subscribeToTopics( Rule rule = topic.rule(); String topicName = stringValueResolver.resolveStringValue(topic.name()); String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName()); + String deadLetterTopic = stringValueResolver.resolveStringValue(topic.deadLetterTopic()); String match = stringValueResolver.resolveStringValue(rule.match()); if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) { try { @@ -116,7 +117,7 @@ private static void subscribeToTopics( List routes = getAllCompleteRoutesForPost(clazz, method, topicName); for (String route : routes) { daprRuntime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), route, metadata, bulkSubscribe); + pubSubName, topicName, match, rule.priority(), route, deadLetterTopic, metadata, bulkSubscribe); } } catch (JsonProcessingException e) { throw new IllegalArgumentException("Error while parsing metadata: " + e); diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java index dff215b21..62bb9d9ff 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java @@ -22,6 +22,7 @@ * Internal Singleton to handle Dapr configuration. */ class DaprRuntime { + /** * The singleton instance. */ @@ -33,8 +34,8 @@ class DaprRuntime { private final Map subscriptionBuilders = new HashMap<>(); /** - * DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. - * The constructor's default scope is available for unit tests only. + * DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. The + * constructor's default scope is available for unit tests only. */ private DaprRuntime() { } @@ -60,30 +61,30 @@ public static DaprRuntime getInstance() { * Adds a topic to the list of subscribed topics. * * @param pubSubName PubSub name to subscribe to. - * @param topicName Name of the topic being subscribed to. - * @param match Match expression for this route. - * @param priority Priority for this match relative to others. - * @param route Destination route for requests. - * @param metadata Metadata for extended subscription functionality. + * @param topicName Name of the topic being subscribed to. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. + * @param route Destination route for requests. + * @param metadata Metadata for extended subscription functionality. */ public synchronized void addSubscribedTopic(String pubSubName, String topicName, String match, int priority, String route, - Map metadata) { + Map metadata) { this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null); } /** * Adds a topic to the list of subscribed topics. * - * @param pubSubName PubSub name to subscribe to. - * @param topicName Name of the topic being subscribed to. - * @param match Match expression for this route. - * @param priority Priority for this match relative to others. - * @param route Destination route for requests. - * @param metadata Metadata for extended subscription functionality. + * @param pubSubName PubSub name to subscribe to. + * @param topicName Name of the topic being subscribed to. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. + * @param route Destination route for requests. + * @param metadata Metadata for extended subscription functionality. * @param bulkSubscribe Bulk subscribe configuration. */ public synchronized void addSubscribedTopic(String pubSubName, @@ -91,7 +92,53 @@ public synchronized void addSubscribedTopic(String pubSubName, String match, int priority, String route, - Map metadata, + Map metadata, + DaprTopicBulkSubscribe bulkSubscribe) { + this.addSubscribedTopic(pubSubName, topicName, match, priority, route, null, + metadata, bulkSubscribe); + } + + /** + * Adds a topic to the list of subscribed topics. + * + * @param pubSubName PubSub name to subscribe to. + * @param topicName Name of the topic being subscribed to. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. + * @param route Destination route for requests. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. + */ + public synchronized void addSubscribedTopic(String pubSubName, + String topicName, + String match, + int priority, + String route, + String deadLetterTopic, + Map metadata) { + this.addSubscribedTopic(pubSubName, topicName, match, priority, route, deadLetterTopic, + metadata, null); + } + + /** + * Adds a topic to the list of subscribed topics. + * + * @param pubSubName PubSub name to subscribe to. + * @param topicName Name of the topic being subscribed to. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. + * @param route Destination route for requests. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. + */ + public synchronized void addSubscribedTopic(String pubSubName, + String topicName, + String match, + int priority, + String route, + String deadLetterTopic, + Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName); @@ -111,6 +158,10 @@ public synchronized void addSubscribedTopic(String pubSubName, builder.setMetadata(metadata); } + if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) { + builder.setDeadLetterTopic(deadLetterTopic); + } + if (bulkSubscribe != null) { builder.setBulkSubscribe(bulkSubscribe); } @@ -118,7 +169,7 @@ public synchronized void addSubscribedTopic(String pubSubName, public synchronized DaprTopicSubscription[] listSubscribedTopics() { List values = subscriptionBuilders.values().stream() - .map(b -> b.build()).collect(Collectors.toList()); + .map(b -> b.build()).collect(Collectors.toList()); return values.toArray(new DaprTopicSubscription[0]); } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java index 1dbbf8528..493883d33 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java @@ -21,9 +21,11 @@ import java.util.stream.Collectors; class DaprSubscriptionBuilder { + private final String pubsubName; private final String topic; private final List rules; + private String deadLetterTopic; private String defaultPath; private Map metadata; @@ -31,19 +33,22 @@ class DaprSubscriptionBuilder { /** * Create a subscription topic. + * * @param pubsubName The pubsub name to subscribe to. - * @param topic The topic to subscribe to. + * @param topic The topic to subscribe to. */ DaprSubscriptionBuilder(String pubsubName, String topic) { this.pubsubName = pubsubName; this.topic = topic; this.rules = new ArrayList<>(); + this.deadLetterTopic = null; this.defaultPath = null; this.metadata = Collections.emptyMap(); } /** * Sets the default path for the subscription. + * * @param path The default path. * @return this instance. */ @@ -51,28 +56,48 @@ DaprSubscriptionBuilder setDefaultPath(String path) { if (defaultPath != null) { if (!defaultPath.equals(path)) { throw new RuntimeException( - String.format( - "a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", - this.topic, this.pubsubName, this.defaultPath, path)); + String.format( + "a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", + this.topic, this.pubsubName, this.defaultPath, path)); } } defaultPath = path; return this; } + /** + * Sets the dead letter topic for the subscription. + * + * @param deadLetterTopic Name of dead letter topic. + * @return this instance. + */ + DaprSubscriptionBuilder setDeadLetterTopic(String deadLetterTopic) { + if (this.deadLetterTopic != null) { + if (!this.deadLetterTopic.equals(deadLetterTopic)) { + throw new RuntimeException( + String.format( + "a default dead letter topic is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", + this.topic, this.pubsubName, this.deadLetterTopic, deadLetterTopic)); + } + } + this.deadLetterTopic = deadLetterTopic; + return this; + } + /** * Adds a rule to the subscription. - * @param path The path to route to. - * @param match The CEL expression the event must match. + * + * @param path The path to route to. + * @param match The CEL expression the event must match. * @param priority The priority of the rule. * @return this instance. */ public DaprSubscriptionBuilder addRule(String path, String match, int priority) { if (rules.stream().anyMatch(e -> e.getPriority() == priority)) { throw new RuntimeException( - String.format( - "a rule priority of %d is already used for topic %s on pubsub %s", - priority, this.topic, this.pubsubName)); + String.format( + "a rule priority of %d is already used for topic %s on pubsub %s", + priority, this.topic, this.pubsubName)); } rules.add(new TopicRule(path, match, priority)); return this; @@ -80,6 +105,7 @@ public DaprSubscriptionBuilder addRule(String path, String match, int priority) /** * Sets the metadata for the subscription. + * * @param metadata The metadata. * @return this instance. */ @@ -90,6 +116,7 @@ public DaprSubscriptionBuilder setMetadata(Map metadata) { /** * Sets the bulkSubscribe configuration for the subscription. + * * @param bulkSubscribe The bulk subscribe configuration. * @return this instance. */ @@ -100,6 +127,7 @@ public DaprSubscriptionBuilder setBulkSubscribe(DaprTopicBulkSubscribe bulkSubsc /** * Builds the DaprTopicSubscription that is returned by the application to Dapr. + * * @return The DaprTopicSubscription. */ public DaprTopicSubscription build() { @@ -109,16 +137,19 @@ public DaprTopicSubscription build() { if (!rules.isEmpty()) { Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority)); List topicRules = rules.stream() - .map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList()); + .map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList()); routes = new DaprTopicRoutes(topicRules, defaultPath); } else { route = defaultPath; } - return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata, bulkSubscribe); + return new DaprTopicSubscription(this.pubsubName, this.topic, route, this.deadLetterTopic, + routes, metadata, + bulkSubscribe); } private static class TopicRule { + private final String path; private final String match; private final int priority; diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java index 6f9acee93..6d9ee0d3d 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java @@ -23,6 +23,7 @@ class DaprTopicSubscription { private final String pubsubName; private final String topic; private final String route; + private final String deadLetterTopic; private final DaprTopicRoutes routes; private final Map metadata; private final DaprTopicBulkSubscribe bulkSubscribe; @@ -32,7 +33,7 @@ class DaprTopicSubscription { * @param pubsubName The pubsub name to subscribe to. * @param topic The topic to subscribe to. * @param route Destination route for messages. - * @param metadata Metdata for extended subscription functionality. + * @param metadata Metadata for extended subscription functionality. */ DaprTopicSubscription(String pubsubName, String topic, String route, Map metadata) { this(pubsubName, topic, route, metadata, null); @@ -43,11 +44,39 @@ class DaprTopicSubscription { * @param pubsubName The pubsub name to subscribe to. * @param topic The topic to subscribe to. * @param route Destination route for messages. - * @param metadata Metdata for extended subscription functionality. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. + */ + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + Map metadata) { + this(pubsubName, topic, route, deadLetterTopic, null, metadata, null); + } + + /** + * Create a subscription topic. + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. */ DaprTopicSubscription(String pubsubName, String topic, String route, - Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { - this(pubsubName, topic, route, null, metadata, bulkSubscribe); + Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { + this(pubsubName, topic, route, "", null, metadata, bulkSubscribe); + } + + /** + * Create a subscription topic. + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. + */ + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { + this(pubsubName, topic, route, deadLetterTopic, null, metadata, bulkSubscribe); } /** @@ -59,8 +88,8 @@ class DaprTopicSubscription { * @param metadata Metadata for extended subscription functionality. */ DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes, - Map metadata) { - this(pubsubName, topic, route, routes, metadata, null); + Map metadata) { + this(pubsubName, topic, route, "", routes, metadata, null); } /** @@ -68,16 +97,53 @@ class DaprTopicSubscription { * @param pubsubName The pubsub name to subscribe to. * @param topic The topic to subscribe to. * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. * @param routes Destination routes with rules for messages. * @param metadata Metadata for extended subscription functionality. - * @param bulkSubscribe Bulk subscribe configuration. */ - DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes, - Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, DaprTopicRoutes routes, + Map metadata) { + this(pubsubName, topic, route, deadLetterTopic, routes, metadata, null); + } + + /** + * Create a subscription topic. + * + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param routes Destination routes with rules for messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. + */ + DaprTopicSubscription(String pubsubName, String topic, String route, + DaprTopicRoutes routes, + Map metadata, + DaprTopicBulkSubscribe bulkSubscribe) { + this(pubsubName, topic, route, "", routes, metadata, bulkSubscribe); + } + + + /** + * Create a subscription topic. + * + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param routes Destination routes with rules for messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. + */ + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + DaprTopicRoutes routes, + Map metadata, + DaprTopicBulkSubscribe bulkSubscribe) { this.pubsubName = pubsubName; this.topic = topic; this.route = route; this.routes = routes; + this.deadLetterTopic = deadLetterTopic; this.metadata = Collections.unmodifiableMap(metadata); this.bulkSubscribe = bulkSubscribe; } @@ -98,6 +164,10 @@ public DaprTopicRoutes getRoutes() { return routes; } + public String getDeadLetterTopic() { + return deadLetterTopic; + } + public Map getMetadata() { return metadata; } diff --git a/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java b/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java index 5010d3310..33303435f 100644 --- a/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java +++ b/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java @@ -74,6 +74,7 @@ private DaprTopicSubscription[] getTestDaprTopicSubscriptions() { MockControllerWithSubscribe.pubSubName, MockControllerWithSubscribe.topicName, MockControllerWithSubscribe.subscribeRoute, + MockControllerWithSubscribe.deadLetterTopic, new HashMap<>()); DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true); @@ -84,6 +85,7 @@ private DaprTopicSubscription[] getTestDaprTopicSubscriptions() { MockControllerWithSubscribe.pubSubName, MockControllerWithSubscribe.bulkTopicName, MockControllerWithSubscribe.bulkSubscribeRoute, + MockControllerWithSubscribe.deadLetterTopic, new HashMap<>(), bulkSubscribe); diff --git a/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java b/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java index 0488ef299..e49a5bcda 100644 --- a/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java +++ b/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java @@ -13,6 +13,7 @@ public class DaprRuntimeTest { public void testPubsubDefaultPathDuplicateRegistration() { String pubSubName = "pubsub"; String topicName = "topic"; + String deadLetterTopic = "deadLetterTopic"; String match = ""; String route = String.format("%s/%s", pubSubName, topicName); HashMap metadata = new HashMap(); @@ -36,15 +37,16 @@ public int priority() { // We should be able to register the same route multiple times runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), route, metadata); + pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata); runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), route, metadata); + pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata); } @Test(expected = RuntimeException.class) public void testPubsubDefaultPathDifferentRegistration() { String pubSubName = "pubsub"; String topicName = "topic"; + String deadLetterTopic = "deadLetterTopic"; String match = ""; String firstRoute = String.format("%s/%s", pubSubName, topicName); String secondRoute = String.format("%s/%s/subscribe", pubSubName, topicName); @@ -70,11 +72,11 @@ public int priority() { Assert.assertNotNull(runtime); runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), firstRoute, metadata); + pubSubName, topicName, match, rule.priority(), firstRoute, deadLetterTopic, metadata); // Supplying the same pubsub bits but a different route should fail runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), secondRoute, metadata); + pubSubName, topicName, match, rule.priority(), secondRoute, deadLetterTopic, metadata); } diff --git a/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java b/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java index a0ae129cc..0a335f105 100644 --- a/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java +++ b/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java @@ -20,6 +20,7 @@ public class MockControllerWithSubscribe { public static final String pubSubName = "mockPubSub"; public static final String topicName = "mockTopic"; + public static final String deadLetterTopic = "deadLetterTopic"; public static final String bulkTopicName = "mockBulkTopic"; public static final String bulkTopicNameV2 = "mockBulkTopicV2"; public static final String subscribeRoute = "mockRoute"; @@ -27,12 +28,12 @@ public class MockControllerWithSubscribe { public static final int maxMessagesCount = 500; public static final int maxAwaitDurationMs = 1000; - @Topic(name = topicName, pubsubName = pubSubName) + @Topic(name = topicName, pubsubName = pubSubName, deadLetterTopic = deadLetterTopic) @PostMapping(path = subscribeRoute) public void handleMessages() {} @BulkSubscribe(maxMessagesCount = maxMessagesCount, maxAwaitDurationMs = maxAwaitDurationMs) - @Topic(name = bulkTopicName, pubsubName = pubSubName) + @Topic(name = bulkTopicName, pubsubName = pubSubName,deadLetterTopic = deadLetterTopic) @PostMapping(path = bulkSubscribeRoute) public void handleBulkMessages() {} } diff --git a/sdk/src/main/java/io/dapr/Topic.java b/sdk/src/main/java/io/dapr/Topic.java index 2cd7624b0..22c98074f 100644 --- a/sdk/src/main/java/io/dapr/Topic.java +++ b/sdk/src/main/java/io/dapr/Topic.java @@ -50,4 +50,10 @@ * @return metadata object */ String metadata() default "{}"; + + /** + * Name of dead letter topic to forward undeliverable messages. + * @return dead letter topic's name. + */ + String deadLetterTopic() default ""; }