Skip to content

Commit

Permalink
[#3549] Improved getOrCreateTopic/Subscription flow for Pub/Sub messa…
Browse files Browse the repository at this point in the history
…ging infrastructure and changed the subscription expiration to never

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Sep 25, 2023
1 parent a7ec5d6 commit 431cf59
Showing 1 changed file with 35 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@
package org.eclipse.hono.client.pubsub;

import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.util.Durations;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
Expand All @@ -51,15 +49,6 @@ public class PubSubBasedAdminClientManager {
private static final long MESSAGE_RETENTION = 600000;
private final String projectId;

/**
* A set of existing subscriptions. It contains subscriptions with the format
* `"projects/{project}/subscriptions/{subscription}"`.
*/
private final Set<String> subscriptions = new HashSet<>();
/**
* A set of existing topics. It contains topics with the format `"projects/{project}/topics/{topic}"`.
*/
private final Set<String> topics = new HashSet<>();
private final CredentialsProvider credentialsProvider;
private SubscriptionAdminClient subscriptionAdminClient;
private TopicAdminClient topicAdminClient;
Expand Down Expand Up @@ -114,74 +103,80 @@ private Future<SubscriptionAdminClient> getOrCreateSubscriptionAdminClient() {
*
* @param endpoint The endpoint name of the topic, e.g. command_internal.
* @param prefix The prefix of the topic, e.g. the adapter instance ID.
* @return A succeeded Future if the topic is successfully created or already exists, or a failed
* Future if it could not be created.
* @return A succeeded Future if the topic is successfully created or already exists, or a failed Future if it could
* not be created.
*/
public Future<String> getOrCreateTopic(final String endpoint, final String prefix) {
final String topicName = PubSubMessageHelper.getTopicName(endpoint, prefix);
final TopicName topic = TopicName.of(projectId, topicName);

if (topics.contains(topic.toString())) {
LOG.debug("Topic {} already exists, continue", topic);
return Future.succeededFuture(topic.getTopic());
}
return getOrCreateTopicAdminClient()
.onFailure(thr -> LOG.debug("admin client creation failed", thr))
.compose(client -> getOrCreateTopic(projectId, topic, client));
.compose(client -> getTopic(topic, client)
.recover(thr -> createTopic(projectId, topic, client)));
}

private Future<String> getTopic(final TopicName topic, final TopicAdminClient client) {
try {
return Future.succeededFuture(client.getTopic(topic).getName());
} catch (ApiException e) {
return Future.failedFuture("Could not get topic");
}
}

private Future<String> getOrCreateTopic(final String projectId, final TopicName topic, final TopicAdminClient client) {
private Future<String> createTopic(final String projectId, final TopicName topic, final TopicAdminClient client) {
try {
final Topic createdTopic = client.createTopic(topic);
if (createdTopic == null) {
LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId);
return Future.failedFuture("Topic creation failed.");
}
topics.add(createdTopic.getName());
return Future.succeededFuture(topic.getTopic());
} catch (AlreadyExistsException ex) {
LOG.debug("Topic {} created successfully.", topic);
return Future.succeededFuture(topic.getTopic());
} catch (ApiException e) {
LOG.debug("Error creating topic {} on project {}", topic, projectId, e);
return Future.failedFuture("Topic creation failed.");
}

}

/**
* Gets an existing subscription or creates a new one on Pub/Sub based on the given subscription endpoint and prefix.
* Gets an existing subscription or creates a new one on Pub/Sub based on the given subscription endpoint and
* prefix.
*
* @param endpoint The endpoint name of the subscription, e.g. command_internal.
* @param prefix The prefix of the subscription, e.g. the adapter instance ID.
* @return A succeeded Future if the subscription is successfully created or already exists, or a failed
* Future if it could not be created.
* @return A succeeded Future if the subscription is successfully created or already exists, or a failed Future if
* it could not be created.
*/
public Future<String> getOrCreateSubscription(final String endpoint, final String prefix) {
final String topicAndSubscriptionName = PubSubMessageHelper.getTopicName(endpoint, prefix);
final TopicName topic = TopicName.of(projectId, topicAndSubscriptionName);
final SubscriptionName subscription = SubscriptionName.of(projectId, topicAndSubscriptionName);

if (subscriptions.contains(subscription.toString())) {
LOG.debug("Subscription {} already exists, continue", subscription);
return Future.succeededFuture(subscription.getSubscription());
}
return getOrCreateSubscriptionAdminClient()
.onFailure(thr -> LOG.debug("admin client creation failed", thr))
.compose(client -> getOrCreateSubscription(projectId, subscription, topic, client));
.compose(client -> getSubscription(subscription, client)
.recover(thr -> createSubscription(projectId, subscription, topic, client)));
}

private Future<String> getSubscription(final SubscriptionName subscription, final SubscriptionAdminClient client) {
try {
return Future.succeededFuture(client.getSubscription(subscription).getName());
} catch (ApiException e) {
return Future.failedFuture("Could not get topic");
}
}

private Future<String> getOrCreateSubscription(
final String projectId,
final SubscriptionName subscription,
final TopicName topic,
final SubscriptionAdminClient client) {
private Future<String> createSubscription(final String projectId, final SubscriptionName subscription,
final TopicName topic, final SubscriptionAdminClient client) {
try {
final Subscription request = Subscription.newBuilder()
.setName(subscription.toString())
.setTopic(topic.toString())
.setPushConfig(PushConfig.getDefaultInstance())
.setAckDeadlineSeconds(0)
.setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION))
.setExpirationPolicy(ExpirationPolicy.getDefaultInstance())
.build();
final Subscription createdSubscription = client.createSubscription(request);
if (createdSubscription == null) {
Expand All @@ -190,9 +185,7 @@ private Future<String> getOrCreateSubscription(
projectId);
return Future.failedFuture("Subscription creation failed.");
}
subscriptions.add(createdSubscription.getName());
return Future.succeededFuture(subscription.getSubscription());
} catch (AlreadyExistsException ex) {
LOG.debug("Subscription {} created successfully.", subscription);
return Future.succeededFuture(subscription.getSubscription());
} catch (ApiException e) {
LOG.debug("Error creating subscription {} for topic {} on project {}", subscription, topic, projectId, e);
Expand All @@ -202,8 +195,8 @@ private Future<String> getOrCreateSubscription(

/**
* Closes the TopicAdminClient and the SubscriptionAdminClient if they exist. This method is expected to be invoked
* as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed.
* This method will bock the current thread for up to 10 seconds!
* as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. This method will bock the
* current thread for up to 10 seconds!
*/
public void closeAdminClients() {
if (topicAdminClient == null && subscriptionAdminClient == null) {
Expand Down

0 comments on commit 431cf59

Please sign in to comment.