diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java index 928102d21c..ab86256e55 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java @@ -107,6 +107,7 @@ private static void subscribeToTopics( Rule rule = topic.rule(); String topicName = stringValueResolver.resolveStringValue(topic.name()); String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName()); + String deadLetterTopic = stringValueResolver.resolveStringValue(topic.deadLetterTopic()); String match = stringValueResolver.resolveStringValue(rule.match()); if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) { try { @@ -116,7 +117,7 @@ private static void subscribeToTopics( List routes = getAllCompleteRoutesForPost(clazz, method, topicName); for (String route : routes) { daprRuntime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), route, metadata, bulkSubscribe); + pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata, bulkSubscribe); } } catch (JsonProcessingException e) { throw new IllegalArgumentException("Error while parsing metadata: " + e); diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java index dff215b213..d45ad46ec3 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java @@ -22,6 +22,7 @@ * Internal Singleton to handle Dapr configuration. */ class DaprRuntime { + /** * The singleton instance. */ @@ -33,8 +34,8 @@ class DaprRuntime { private final Map subscriptionBuilders = new HashMap<>(); /** - * DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. - * The constructor's default scope is available for unit tests only. + * DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. The + * constructor's default scope is available for unit tests only. */ private DaprRuntime() { } @@ -59,40 +60,45 @@ public static DaprRuntime getInstance() { /** * Adds a topic to the list of subscribed topics. * - * @param pubSubName PubSub name to subscribe to. - * @param topicName Name of the topic being subscribed to. - * @param match Match expression for this route. - * @param priority Priority for this match relative to others. - * @param route Destination route for requests. - * @param metadata Metadata for extended subscription functionality. + * @param pubSubName PubSub name to subscribe to. + * @param topicName Name of the topic being subscribed to. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. + * @param route Destination route for requests. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. */ public synchronized void addSubscribedTopic(String pubSubName, - String topicName, - String match, - int priority, - String route, - Map metadata) { - this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null); + String topicName, + String match, + int priority, + String route, + String deadLetterTopic, + Map metadata) { + this.addSubscribedTopic(pubSubName, topicName, match, priority, route, deadLetterTopic, + metadata, null); } /** * Adds a topic to the list of subscribed topics. * - * @param pubSubName PubSub name to subscribe to. - * @param topicName Name of the topic being subscribed to. - * @param match Match expression for this route. - * @param priority Priority for this match relative to others. - * @param route Destination route for requests. - * @param metadata Metadata for extended subscription functionality. - * @param bulkSubscribe Bulk subscribe configuration. + * @param pubSubName PubSub name to subscribe to. + * @param topicName Name of the topic being subscribed to. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. + * @param route Destination route for requests. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. */ public synchronized void addSubscribedTopic(String pubSubName, - String topicName, - String match, - int priority, - String route, - Map metadata, - DaprTopicBulkSubscribe bulkSubscribe) { + String topicName, + String match, + int priority, + String route, + String deadLetterTopic, + Map metadata, + DaprTopicBulkSubscribe bulkSubscribe) { DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName); DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey); @@ -111,6 +117,10 @@ public synchronized void addSubscribedTopic(String pubSubName, builder.setMetadata(metadata); } + if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) { + builder.setDeadLetterTopic(deadLetterTopic); + } + if (bulkSubscribe != null) { builder.setBulkSubscribe(bulkSubscribe); } @@ -118,7 +128,7 @@ public synchronized void addSubscribedTopic(String pubSubName, public synchronized DaprTopicSubscription[] listSubscribedTopics() { List values = subscriptionBuilders.values().stream() - .map(b -> b.build()).collect(Collectors.toList()); + .map(b -> b.build()).collect(Collectors.toList()); return values.toArray(new DaprTopicSubscription[0]); } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java index 1dbbf85287..493883d33d 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java @@ -21,9 +21,11 @@ import java.util.stream.Collectors; class DaprSubscriptionBuilder { + private final String pubsubName; private final String topic; private final List rules; + private String deadLetterTopic; private String defaultPath; private Map metadata; @@ -31,19 +33,22 @@ class DaprSubscriptionBuilder { /** * Create a subscription topic. + * * @param pubsubName The pubsub name to subscribe to. - * @param topic The topic to subscribe to. + * @param topic The topic to subscribe to. */ DaprSubscriptionBuilder(String pubsubName, String topic) { this.pubsubName = pubsubName; this.topic = topic; this.rules = new ArrayList<>(); + this.deadLetterTopic = null; this.defaultPath = null; this.metadata = Collections.emptyMap(); } /** * Sets the default path for the subscription. + * * @param path The default path. * @return this instance. */ @@ -51,28 +56,48 @@ DaprSubscriptionBuilder setDefaultPath(String path) { if (defaultPath != null) { if (!defaultPath.equals(path)) { throw new RuntimeException( - String.format( - "a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", - this.topic, this.pubsubName, this.defaultPath, path)); + String.format( + "a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", + this.topic, this.pubsubName, this.defaultPath, path)); } } defaultPath = path; return this; } + /** + * Sets the dead letter topic for the subscription. + * + * @param deadLetterTopic Name of dead letter topic. + * @return this instance. + */ + DaprSubscriptionBuilder setDeadLetterTopic(String deadLetterTopic) { + if (this.deadLetterTopic != null) { + if (!this.deadLetterTopic.equals(deadLetterTopic)) { + throw new RuntimeException( + String.format( + "a default dead letter topic is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", + this.topic, this.pubsubName, this.deadLetterTopic, deadLetterTopic)); + } + } + this.deadLetterTopic = deadLetterTopic; + return this; + } + /** * Adds a rule to the subscription. - * @param path The path to route to. - * @param match The CEL expression the event must match. + * + * @param path The path to route to. + * @param match The CEL expression the event must match. * @param priority The priority of the rule. * @return this instance. */ public DaprSubscriptionBuilder addRule(String path, String match, int priority) { if (rules.stream().anyMatch(e -> e.getPriority() == priority)) { throw new RuntimeException( - String.format( - "a rule priority of %d is already used for topic %s on pubsub %s", - priority, this.topic, this.pubsubName)); + String.format( + "a rule priority of %d is already used for topic %s on pubsub %s", + priority, this.topic, this.pubsubName)); } rules.add(new TopicRule(path, match, priority)); return this; @@ -80,6 +105,7 @@ public DaprSubscriptionBuilder addRule(String path, String match, int priority) /** * Sets the metadata for the subscription. + * * @param metadata The metadata. * @return this instance. */ @@ -90,6 +116,7 @@ public DaprSubscriptionBuilder setMetadata(Map metadata) { /** * Sets the bulkSubscribe configuration for the subscription. + * * @param bulkSubscribe The bulk subscribe configuration. * @return this instance. */ @@ -100,6 +127,7 @@ public DaprSubscriptionBuilder setBulkSubscribe(DaprTopicBulkSubscribe bulkSubsc /** * Builds the DaprTopicSubscription that is returned by the application to Dapr. + * * @return The DaprTopicSubscription. */ public DaprTopicSubscription build() { @@ -109,16 +137,19 @@ public DaprTopicSubscription build() { if (!rules.isEmpty()) { Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority)); List topicRules = rules.stream() - .map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList()); + .map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList()); routes = new DaprTopicRoutes(topicRules, defaultPath); } else { route = defaultPath; } - return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata, bulkSubscribe); + return new DaprTopicSubscription(this.pubsubName, this.topic, route, this.deadLetterTopic, + routes, metadata, + bulkSubscribe); } private static class TopicRule { + private final String path; private final String match; private final int priority; diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java index 6f9acee93c..3c2eef2f98 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java @@ -20,64 +20,78 @@ * Class to represent a subscription topic along with its metadata. */ class DaprTopicSubscription { + private final String pubsubName; private final String topic; private final String route; + private final String deadLetterTopic; private final DaprTopicRoutes routes; private final Map metadata; private final DaprTopicBulkSubscribe bulkSubscribe; /** * Create a subscription topic. - * @param pubsubName The pubsub name to subscribe to. - * @param topic The topic to subscribe to. - * @param route Destination route for messages. - * @param metadata Metdata for extended subscription functionality. + * + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metdata for extended subscription functionality. */ - DaprTopicSubscription(String pubsubName, String topic, String route, Map metadata) { - this(pubsubName, topic, route, metadata, null); + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + Map metadata) { + this(pubsubName, topic, route, deadLetterTopic, metadata, null); } /** * Create a subscription topic. - * @param pubsubName The pubsub name to subscribe to. - * @param topic The topic to subscribe to. - * @param route Destination route for messages. - * @param metadata Metdata for extended subscription functionality. + * + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param metadata Metadata for extended subscription functionality. */ - DaprTopicSubscription(String pubsubName, String topic, String route, - Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { - this(pubsubName, topic, route, null, metadata, bulkSubscribe); + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { + this(pubsubName, topic, route, deadLetterTopic,null, metadata, bulkSubscribe); } /** * Create a subscription topic. - * @param pubsubName The pubsub name to subscribe to. - * @param topic The topic to subscribe to. - * @param route Destination route for messages. - * @param routes Destination routes with rules for messages. - * @param metadata Metadata for extended subscription functionality. + * + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param routes Destination routes with rules for messages. + * @param metadata Metadata for extended subscription functionality. */ - DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes, - Map metadata) { - this(pubsubName, topic, route, routes, metadata, null); + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + DaprTopicRoutes routes, + Map metadata) { + this(pubsubName, topic, route, deadLetterTopic, routes, metadata, null); } /** * Create a subscription topic. - * @param pubsubName The pubsub name to subscribe to. - * @param topic The topic to subscribe to. - * @param route Destination route for messages. - * @param routes Destination routes with rules for messages. - * @param metadata Metadata for extended subscription functionality. - * @param bulkSubscribe Bulk subscribe configuration. + * + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param deadLetterTopic Name of topic to forward undeliverable messages. + * @param routes Destination routes with rules for messages. + * @param metadata Metadata for extended subscription functionality. + * @param bulkSubscribe Bulk subscribe configuration. */ - DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes, - Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { + DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, + DaprTopicRoutes routes, + Map metadata, DaprTopicBulkSubscribe bulkSubscribe) { this.pubsubName = pubsubName; this.topic = topic; this.route = route; this.routes = routes; + this.deadLetterTopic = deadLetterTopic; this.metadata = Collections.unmodifiableMap(metadata); this.bulkSubscribe = bulkSubscribe; } @@ -94,6 +108,10 @@ public String getRoute() { return route; } + public String getDeadLetterTopic() { + return deadLetterTopic; + } + public DaprTopicRoutes getRoutes() { return routes; } diff --git a/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java b/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java index 5010d33105..33303435f5 100644 --- a/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java +++ b/sdk-springboot/src/test/java/io/dapr/springboot/DaprBeanPostProcessorSubscribeTest.java @@ -74,6 +74,7 @@ private DaprTopicSubscription[] getTestDaprTopicSubscriptions() { MockControllerWithSubscribe.pubSubName, MockControllerWithSubscribe.topicName, MockControllerWithSubscribe.subscribeRoute, + MockControllerWithSubscribe.deadLetterTopic, new HashMap<>()); DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true); @@ -84,6 +85,7 @@ private DaprTopicSubscription[] getTestDaprTopicSubscriptions() { MockControllerWithSubscribe.pubSubName, MockControllerWithSubscribe.bulkTopicName, MockControllerWithSubscribe.bulkSubscribeRoute, + MockControllerWithSubscribe.deadLetterTopic, new HashMap<>(), bulkSubscribe); diff --git a/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java b/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java index 0488ef2998..dd3cdf8418 100644 --- a/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java +++ b/sdk-springboot/src/test/java/io/dapr/springboot/DaprRuntimeTest.java @@ -13,6 +13,7 @@ public class DaprRuntimeTest { public void testPubsubDefaultPathDuplicateRegistration() { String pubSubName = "pubsub"; String topicName = "topic"; + String deadLetterTopic = "deadLetterTopic"; String match = ""; String route = String.format("%s/%s", pubSubName, topicName); HashMap metadata = new HashMap(); @@ -36,15 +37,16 @@ public int priority() { // We should be able to register the same route multiple times runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), route, metadata); + pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata); runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), route, metadata); + pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata); } @Test(expected = RuntimeException.class) public void testPubsubDefaultPathDifferentRegistration() { String pubSubName = "pubsub"; String topicName = "topic"; + String deadLetterTopic = "deadLetterTopic"; String match = ""; String firstRoute = String.format("%s/%s", pubSubName, topicName); String secondRoute = String.format("%s/%s/subscribe", pubSubName, topicName); @@ -70,11 +72,11 @@ public int priority() { Assert.assertNotNull(runtime); runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), firstRoute, metadata); + pubSubName, topicName, match, rule.priority(), firstRoute,deadLetterTopic, metadata); // Supplying the same pubsub bits but a different route should fail runtime.addSubscribedTopic( - pubSubName, topicName, match, rule.priority(), secondRoute, metadata); + pubSubName, topicName, match, rule.priority(), secondRoute,deadLetterTopic, metadata); } diff --git a/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java b/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java index a0ae129cc1..0a335f105b 100644 --- a/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java +++ b/sdk-springboot/src/test/java/io/dapr/springboot/MockControllerWithSubscribe.java @@ -20,6 +20,7 @@ public class MockControllerWithSubscribe { public static final String pubSubName = "mockPubSub"; public static final String topicName = "mockTopic"; + public static final String deadLetterTopic = "deadLetterTopic"; public static final String bulkTopicName = "mockBulkTopic"; public static final String bulkTopicNameV2 = "mockBulkTopicV2"; public static final String subscribeRoute = "mockRoute"; @@ -27,12 +28,12 @@ public class MockControllerWithSubscribe { public static final int maxMessagesCount = 500; public static final int maxAwaitDurationMs = 1000; - @Topic(name = topicName, pubsubName = pubSubName) + @Topic(name = topicName, pubsubName = pubSubName, deadLetterTopic = deadLetterTopic) @PostMapping(path = subscribeRoute) public void handleMessages() {} @BulkSubscribe(maxMessagesCount = maxMessagesCount, maxAwaitDurationMs = maxAwaitDurationMs) - @Topic(name = bulkTopicName, pubsubName = pubSubName) + @Topic(name = bulkTopicName, pubsubName = pubSubName,deadLetterTopic = deadLetterTopic) @PostMapping(path = bulkSubscribeRoute) public void handleBulkMessages() {} } diff --git a/sdk/src/main/java/io/dapr/Topic.java b/sdk/src/main/java/io/dapr/Topic.java index 2cd7624b02..22c98074fc 100644 --- a/sdk/src/main/java/io/dapr/Topic.java +++ b/sdk/src/main/java/io/dapr/Topic.java @@ -50,4 +50,10 @@ * @return metadata object */ String metadata() default "{}"; + + /** + * Name of dead letter topic to forward undeliverable messages. + * @return dead letter topic's name. + */ + String deadLetterTopic() default ""; }