From 431cf59d89ca3440fe3eeab603da0cbbb6f3bd99 Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Fri, 8 Sep 2023 14:44:00 +0200 Subject: [PATCH] [#3549] Improved getOrCreateTopic/Subscription flow for Pub/Sub messaging infrastructure and changed the subscription expiration to never Signed-off-by: Matthias Kaemmer --- .../pubsub/PubSubBasedAdminClientManager.java | 77 +++++++++---------- 1 file changed, 35 insertions(+), 42 deletions(-) diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java index 93e9484472..697c214cb2 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java @@ -13,22 +13,20 @@ package org.eclipse.hono.client.pubsub; import java.io.IOException; -import java.util.HashSet; import java.util.Objects; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.pubsub.v1.TopicAdminSettings; import com.google.protobuf.util.Durations; +import com.google.pubsub.v1.ExpirationPolicy; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.SubscriptionName; @@ -51,15 +49,6 @@ public class PubSubBasedAdminClientManager { private static final long MESSAGE_RETENTION = 600000; private final String projectId; - /** - * A set of existing subscriptions. It contains subscriptions with the format - * `"projects/{project}/subscriptions/{subscription}"`. - */ - private final Set subscriptions = new HashSet<>(); - /** - * A set of existing topics. It contains topics with the format `"projects/{project}/topics/{topic}"`. - */ - private final Set topics = new HashSet<>(); private final CredentialsProvider credentialsProvider; private SubscriptionAdminClient subscriptionAdminClient; private TopicAdminClient topicAdminClient; @@ -114,67 +103,72 @@ private Future getOrCreateSubscriptionAdminClient() { * * @param endpoint The endpoint name of the topic, e.g. command_internal. * @param prefix The prefix of the topic, e.g. the adapter instance ID. - * @return A succeeded Future if the topic is successfully created or already exists, or a failed - * Future if it could not be created. + * @return A succeeded Future if the topic is successfully created or already exists, or a failed Future if it could + * not be created. */ public Future getOrCreateTopic(final String endpoint, final String prefix) { final String topicName = PubSubMessageHelper.getTopicName(endpoint, prefix); final TopicName topic = TopicName.of(projectId, topicName); - if (topics.contains(topic.toString())) { - LOG.debug("Topic {} already exists, continue", topic); - return Future.succeededFuture(topic.getTopic()); - } return getOrCreateTopicAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) - .compose(client -> getOrCreateTopic(projectId, topic, client)); + .compose(client -> getTopic(topic, client) + .recover(thr -> createTopic(projectId, topic, client))); + } + + private Future getTopic(final TopicName topic, final TopicAdminClient client) { + try { + return Future.succeededFuture(client.getTopic(topic).getName()); + } catch (ApiException e) { + return Future.failedFuture("Could not get topic"); + } } - private Future getOrCreateTopic(final String projectId, final TopicName topic, final TopicAdminClient client) { + private Future createTopic(final String projectId, final TopicName topic, final TopicAdminClient client) { try { final Topic createdTopic = client.createTopic(topic); if (createdTopic == null) { LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId); return Future.failedFuture("Topic creation failed."); } - topics.add(createdTopic.getName()); - return Future.succeededFuture(topic.getTopic()); - } catch (AlreadyExistsException ex) { + LOG.debug("Topic {} created successfully.", topic); return Future.succeededFuture(topic.getTopic()); } catch (ApiException e) { LOG.debug("Error creating topic {} on project {}", topic, projectId, e); return Future.failedFuture("Topic creation failed."); } - } /** - * Gets an existing subscription or creates a new one on Pub/Sub based on the given subscription endpoint and prefix. + * Gets an existing subscription or creates a new one on Pub/Sub based on the given subscription endpoint and + * prefix. * * @param endpoint The endpoint name of the subscription, e.g. command_internal. * @param prefix The prefix of the subscription, e.g. the adapter instance ID. - * @return A succeeded Future if the subscription is successfully created or already exists, or a failed - * Future if it could not be created. + * @return A succeeded Future if the subscription is successfully created or already exists, or a failed Future if + * it could not be created. */ public Future getOrCreateSubscription(final String endpoint, final String prefix) { final String topicAndSubscriptionName = PubSubMessageHelper.getTopicName(endpoint, prefix); final TopicName topic = TopicName.of(projectId, topicAndSubscriptionName); final SubscriptionName subscription = SubscriptionName.of(projectId, topicAndSubscriptionName); - if (subscriptions.contains(subscription.toString())) { - LOG.debug("Subscription {} already exists, continue", subscription); - return Future.succeededFuture(subscription.getSubscription()); - } return getOrCreateSubscriptionAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) - .compose(client -> getOrCreateSubscription(projectId, subscription, topic, client)); + .compose(client -> getSubscription(subscription, client) + .recover(thr -> createSubscription(projectId, subscription, topic, client))); + } + + private Future getSubscription(final SubscriptionName subscription, final SubscriptionAdminClient client) { + try { + return Future.succeededFuture(client.getSubscription(subscription).getName()); + } catch (ApiException e) { + return Future.failedFuture("Could not get topic"); + } } - private Future getOrCreateSubscription( - final String projectId, - final SubscriptionName subscription, - final TopicName topic, - final SubscriptionAdminClient client) { + private Future createSubscription(final String projectId, final SubscriptionName subscription, + final TopicName topic, final SubscriptionAdminClient client) { try { final Subscription request = Subscription.newBuilder() .setName(subscription.toString()) @@ -182,6 +176,7 @@ private Future getOrCreateSubscription( .setPushConfig(PushConfig.getDefaultInstance()) .setAckDeadlineSeconds(0) .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) + .setExpirationPolicy(ExpirationPolicy.getDefaultInstance()) .build(); final Subscription createdSubscription = client.createSubscription(request); if (createdSubscription == null) { @@ -190,9 +185,7 @@ private Future getOrCreateSubscription( projectId); return Future.failedFuture("Subscription creation failed."); } - subscriptions.add(createdSubscription.getName()); - return Future.succeededFuture(subscription.getSubscription()); - } catch (AlreadyExistsException ex) { + LOG.debug("Subscription {} created successfully.", subscription); return Future.succeededFuture(subscription.getSubscription()); } catch (ApiException e) { LOG.debug("Error creating subscription {} for topic {} on project {}", subscription, topic, projectId, e); @@ -202,8 +195,8 @@ private Future getOrCreateSubscription( /** * Closes the TopicAdminClient and the SubscriptionAdminClient if they exist. This method is expected to be invoked - * as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. - * This method will bock the current thread for up to 10 seconds! + * as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. This method will bock the + * current thread for up to 10 seconds! */ public void closeAdminClients() { if (topicAdminClient == null && subscriptionAdminClient == null) {