Skip to content

Commit

Permalink
feat: Add dead letter topic to a subscription topic.
Browse files Browse the repository at this point in the history
  • Loading branch information
charlieMk1 committed Apr 24, 2023
1 parent 7649ae4 commit 358af42
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private static void subscribeToTopics(
Rule rule = topic.rule();
String topicName = stringValueResolver.resolveStringValue(topic.name());
String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName());
String deadLetterTopic = stringValueResolver.resolveStringValue(topic.deadLetterTopic());
String match = stringValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
Expand All @@ -116,7 +117,7 @@ private static void subscribeToTopics(
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
daprRuntime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata, bulkSubscribe);
pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata, bulkSubscribe);
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e);
Expand Down
66 changes: 38 additions & 28 deletions sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Internal Singleton to handle Dapr configuration.
*/
class DaprRuntime {

/**
* The singleton instance.
*/
Expand All @@ -33,8 +34,8 @@ class DaprRuntime {
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();

/**
* DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}.
* The constructor's default scope is available for unit tests only.
* DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. The
* constructor's default scope is available for unit tests only.
*/
private DaprRuntime() {
}
Expand All @@ -59,40 +60,45 @@ public static DaprRuntime getInstance() {
/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null);
String topicName,
String match,
int priority,
String route,
String deadLetterTopic,
Map<String, String> metadata) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, deadLetterTopic,
metadata, null);
}

/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
String topicName,
String match,
int priority,
String route,
String deadLetterTopic,
Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName);

DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
Expand All @@ -111,14 +117,18 @@ public synchronized void addSubscribedTopic(String pubSubName,
builder.setMetadata(metadata);
}

if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
builder.setDeadLetterTopic(deadLetterTopic);
}

if (bulkSubscribe != null) {
builder.setBulkSubscribe(bulkSubscribe);
}
}

public synchronized DaprTopicSubscription[] listSubscribedTopics() {
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,65 +21,91 @@
import java.util.stream.Collectors;

class DaprSubscriptionBuilder {

private final String pubsubName;
private final String topic;
private final List<TopicRule> rules;
private String deadLetterTopic;
private String defaultPath;
private Map<String, String> metadata;

private DaprTopicBulkSubscribe bulkSubscribe;

/**
* Create a subscription topic.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param topic The topic to subscribe to.
*/
DaprSubscriptionBuilder(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
this.rules = new ArrayList<>();
this.deadLetterTopic = null;
this.defaultPath = null;
this.metadata = Collections.emptyMap();
}

/**
* Sets the default path for the subscription.
*
* @param path The default path.
* @return this instance.
*/
DaprSubscriptionBuilder setDefaultPath(String path) {
if (defaultPath != null) {
if (!defaultPath.equals(path)) {
throw new RuntimeException(
String.format(
"a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.defaultPath, path));
String.format(
"a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.defaultPath, path));
}
}
defaultPath = path;
return this;
}

/**
* Sets the dead letter topic for the subscription.
*
* @param deadLetterTopic Name of dead letter topic.
* @return this instance.
*/
DaprSubscriptionBuilder setDeadLetterTopic(String deadLetterTopic) {
if (this.deadLetterTopic != null) {
if (!this.deadLetterTopic.equals(deadLetterTopic)) {
throw new RuntimeException(
String.format(
"a default dead letter topic is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.deadLetterTopic, deadLetterTopic));
}
}
this.deadLetterTopic = deadLetterTopic;
return this;
}

/**
* Adds a rule to the subscription.
* @param path The path to route to.
* @param match The CEL expression the event must match.
*
* @param path The path to route to.
* @param match The CEL expression the event must match.
* @param priority The priority of the rule.
* @return this instance.
*/
public DaprSubscriptionBuilder addRule(String path, String match, int priority) {
if (rules.stream().anyMatch(e -> e.getPriority() == priority)) {
throw new RuntimeException(
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
}
rules.add(new TopicRule(path, match, priority));
return this;
}

/**
* Sets the metadata for the subscription.
*
* @param metadata The metadata.
* @return this instance.
*/
Expand All @@ -90,6 +116,7 @@ public DaprSubscriptionBuilder setMetadata(Map<String, String> metadata) {

/**
* Sets the bulkSubscribe configuration for the subscription.
*
* @param bulkSubscribe The bulk subscribe configuration.
* @return this instance.
*/
Expand All @@ -100,6 +127,7 @@ public DaprSubscriptionBuilder setBulkSubscribe(DaprTopicBulkSubscribe bulkSubsc

/**
* Builds the DaprTopicSubscription that is returned by the application to Dapr.
*
* @return The DaprTopicSubscription.
*/
public DaprTopicSubscription build() {
Expand All @@ -109,16 +137,19 @@ public DaprTopicSubscription build() {
if (!rules.isEmpty()) {
Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority));
List<DaprTopicRule> topicRules = rules.stream()
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
routes = new DaprTopicRoutes(topicRules, defaultPath);
} else {
route = defaultPath;
}

return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata, bulkSubscribe);
return new DaprTopicSubscription(this.pubsubName, this.topic, route, this.deadLetterTopic,
routes, metadata,
bulkSubscribe);
}

private static class TopicRule {

private final String path;
private final String match;
private final int priority;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,78 @@
* Class to represent a subscription topic along with its metadata.
*/
class DaprTopicSubscription {

private final String pubsubName;
private final String topic;
private final String route;
private final String deadLetterTopic;
private final DaprTopicRoutes routes;
private final Map<String, String> metadata;
private final DaprTopicBulkSubscribe bulkSubscribe;

/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metdata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
this(pubsubName, topic, route, metadata, null);
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
Map<String, String> metadata) {
this(pubsubName, topic, route, deadLetterTopic, metadata, null);
}

/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, null, metadata, bulkSubscribe);
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, deadLetterTopic,null, metadata, bulkSubscribe);
}

/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes,
Map<String, String> metadata) {
this(pubsubName, topic, route, routes, metadata, null);
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
DaprTopicRoutes routes,
Map<String, String> metadata) {
this(pubsubName, topic, route, deadLetterTopic, routes, metadata, null);
}

/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
DaprTopicRoutes routes,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this.pubsubName = pubsubName;
this.topic = topic;
this.route = route;
this.routes = routes;
this.deadLetterTopic = deadLetterTopic;
this.metadata = Collections.unmodifiableMap(metadata);
this.bulkSubscribe = bulkSubscribe;
}
Expand All @@ -94,6 +108,10 @@ public String getRoute() {
return route;
}

public String getDeadLetterTopic() {
return deadLetterTopic;
}

public DaprTopicRoutes getRoutes() {
return routes;
}
Expand Down
Loading

0 comments on commit 358af42

Please sign in to comment.