diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java index 04e5d6881c..11292e1a26 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java @@ -59,7 +59,7 @@ import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions; import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties; import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactoryImpl; +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; @@ -442,7 +442,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { vertx, pubSubConfigProperties.getProjectId(), provider); - final var pubSubBasedAdminClientManagerFactory = new PubSubBasedAdminClientManagerFactoryImpl( + final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager( pubSubConfigProperties, provider, vertx); @@ -456,7 +456,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { tenantClient, tracer, subscriberFactory, - pubSubBasedAdminClientManagerFactory, + pubSubBasedAdminClientManager, null )); } diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java index 984539f8e8..24021d4545 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java @@ -24,7 +24,6 @@ import org.eclipse.hono.client.command.CommandResponseSender; import org.eclipse.hono.client.command.InternalCommandConsumer; import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory; import org.eclipse.hono.client.pubsub.tracing.PubSubTracingHelper; @@ -62,7 +61,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum private final Tracer tracer; private final PubSubSubscriberFactory subscriberFactory; private final LifecycleStatus lifecycleStatus = new LifecycleStatus(); - private final PubSubBasedAdminClientManagerFactory adminClientManagerFactory; + private final PubSubBasedAdminClientManager adminClientManager; private final Vertx vertx; private final MessageReceiver receiver; @@ -76,7 +75,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum * @param tenantClient The client to use for retrieving tenant configuration data. * @param tracer The OpenTracing tracer. * @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages. - * @param adminClientManagerFactory The factory to create Pub/Sub based admin client manager to manage topics and + * @param adminClientManager The factory to create Pub/Sub based admin client manager to manage topics and * subscriptions. * @param receiver The message receiver used to process the received message. * @throws NullPointerException If any of these parameters except receiver are {@code null}. @@ -89,7 +88,7 @@ public PubSubBasedInternalCommandConsumer( final TenantClient tenantClient, final Tracer tracer, final PubSubSubscriberFactory subscriberFactory, - final PubSubBasedAdminClientManagerFactory adminClientManagerFactory, + final PubSubBasedAdminClientManager adminClientManager, final MessageReceiver receiver) { this.vertx = Objects.requireNonNull(vertx); this.commandResponseSender = Objects.requireNonNull(commandResponseSender); @@ -98,7 +97,7 @@ public PubSubBasedInternalCommandConsumer( this.tenantClient = Objects.requireNonNull(tenantClient); this.tracer = Objects.requireNonNull(tracer); this.subscriberFactory = Objects.requireNonNull(subscriberFactory); - this.adminClientManagerFactory = Objects.requireNonNull(adminClientManagerFactory); + this.adminClientManager = Objects.requireNonNull(adminClientManager); this.receiver = receiver != null ? receiver : createReceiver(); } @@ -143,7 +142,6 @@ public Future start() { } else if (!lifecycleStatus.setStarting()) { return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping")); } - final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory.createAdminClientManager(); return adminClientManager .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) diff --git a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java index 74faa09514..3abe813a3f 100644 --- a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java +++ b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java @@ -38,7 +38,6 @@ import org.eclipse.hono.client.command.CommandResponse; import org.eclipse.hono.client.command.CommandResponseSender; import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberClient; import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory; @@ -120,9 +119,6 @@ void setUp() { .getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) .thenReturn(Future.succeededFuture(topicAndSubscription)); - final var adminClientManagerFactory = mock(PubSubBasedAdminClientManagerFactory.class); - when(adminClientManagerFactory.createAdminClientManager()).thenReturn(adminClientManager); - subscriber = mock(PubSubSubscriberClient.class); when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture()); @@ -139,7 +135,7 @@ void setUp() { tenantClient, tracer, subscriberFactory, - adminClientManagerFactory, + adminClientManager, receiver); } diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java index 0f7471afce..8b0c890ba7 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; @@ -31,6 +32,7 @@ import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.vertx.core.Future; @@ -49,7 +51,6 @@ public class PubSubBasedAdminClientManager { */ private static final long MESSAGE_RETENTION = 600000; private final String projectId; - private final CredentialsProvider credentialsProvider; private final Vertx vertx; private SubscriptionAdminClient subscriptionAdminClient; @@ -58,14 +59,15 @@ public class PubSubBasedAdminClientManager { /** * Creates a new PubSubBasedAdminClientManager. * - * @param projectId The identifier of the Google Cloud Project to connect to. + * @param pubSubConfigProperties The Pub/Sub config properties containing the Google project ID. * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. * @param vertx The Vert.x instance to use. * @throws NullPointerException if vertx, credentialsProvider or projectId is {@code null}. */ - PubSubBasedAdminClientManager(final String projectId, final CredentialsProvider credentialsProvider, - final Vertx vertx) { - this.projectId = Objects.requireNonNull(projectId); + public PubSubBasedAdminClientManager(final PubSubConfigProperties pubSubConfigProperties, + final CredentialsProvider credentialsProvider, final Vertx vertx) { + Objects.requireNonNull(pubSubConfigProperties); + this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId()); this.credentialsProvider = Objects.requireNonNull(credentialsProvider); this.vertx = Objects.requireNonNull(vertx); } @@ -113,30 +115,43 @@ private Future getOrCreateSubscriptionAdminClient() { * not be created. */ public Future getOrCreateTopic(final String endpoint, final String prefix) { - final String topicName = PubSubMessageHelper.getTopicName(endpoint, prefix); - final TopicName topic = TopicName.of(projectId, topicName); + final TopicName topicName = TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, prefix)); return getOrCreateTopicAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) - .compose(client -> getTopic(topic, client) + .compose(client -> getTopic(topicName, client) .recover(thr -> { if (thr instanceof NotFoundException) { - return createTopic(topic, client); + return createTopic(topicName, client); } else { return Future.failedFuture(thr); } })); } - private Future getTopic(final TopicName topic, final TopicAdminClient client) { - return vertx.executeBlocking(promise -> promise.tryComplete(client.getTopic(topic).getName())); + private Future getTopic(final TopicName topicName, final TopicAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + final Topic topic = client.getTopic(topicName); + promise.complete(topic.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); } - private Future createTopic(final TopicName topic, final TopicAdminClient client) { + private Future createTopic(final TopicName topicName, final TopicAdminClient client) { final Future createdTopic = vertx - .executeBlocking(promise -> promise.tryComplete(client.createTopic(topic).getName())); - createdTopic.onSuccess(top -> LOG.debug("Topic {} created successfully.", topic)) - .onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId)); + .executeBlocking(promise -> { + try { + final Topic topic = client.createTopic(topicName); + promise.complete(topic.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + createdTopic.onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName)) + .onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId)); return createdTopic; } @@ -151,41 +166,56 @@ private Future createTopic(final TopicName topic, final TopicAdminClient */ public Future getOrCreateSubscription(final String endpoint, final String prefix) { final String topicAndSubscriptionName = PubSubMessageHelper.getTopicName(endpoint, prefix); - final TopicName topic = TopicName.of(projectId, topicAndSubscriptionName); - final SubscriptionName subscription = SubscriptionName.of(projectId, topicAndSubscriptionName); + final TopicName topicName = TopicName.of(projectId, topicAndSubscriptionName); + final SubscriptionName subscriptionName = SubscriptionName.of(projectId, topicAndSubscriptionName); return getOrCreateSubscriptionAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) - .compose(client -> getSubscription(subscription, client) + .compose(client -> getSubscription(subscriptionName, client) .recover(thr -> { if (thr instanceof NotFoundException) { - return createSubscription(subscription, topic, client); + return createSubscription(subscriptionName, topicName, client); } else { return Future.failedFuture(thr); } })); } - private Future getSubscription(final SubscriptionName subscription, final SubscriptionAdminClient client) { - return vertx.executeBlocking(promise -> promise.tryComplete(client.getSubscription(subscription).getName())); + private Future getSubscription(final SubscriptionName subscriptionName, + final SubscriptionAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + final Subscription subscription = client.getSubscription(subscriptionName); + promise.complete(subscription.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); } - private Future createSubscription(final SubscriptionName subscription, final TopicName topic, + private Future createSubscription(final SubscriptionName subscriptionName, final TopicName topicName, final SubscriptionAdminClient client) { final Subscription request = Subscription.newBuilder() - .setName(subscription.toString()) - .setTopic(topic.toString()) + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) .setPushConfig(PushConfig.getDefaultInstance()) .setAckDeadlineSeconds(0) .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) .setExpirationPolicy(ExpirationPolicy.getDefaultInstance()) .build(); final Future createdSubscription = vertx - .executeBlocking(promise -> promise.tryComplete(client.createSubscription(request).getName())); - createdSubscription.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscription)) + .executeBlocking(promise -> { + try { + final Subscription subscription = client.createSubscription(request); + promise.complete(subscription.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + createdSubscription.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName)) .onFailure( thr -> LOG.debug("Creating subscription failed [subscription: {}, topic: {}, project: {}]", - subscription, topic, projectId)); + subscriptionName, topicName, projectId)); return createdSubscription; } diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java deleted file mode 100644 index 433c4808c8..0000000000 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2023 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - *******************************************************************************/ - -package org.eclipse.hono.client.pubsub; - -/** - * A factory to create PubSubBasedAdminClientManager instances. - */ -public interface PubSubBasedAdminClientManagerFactory { - - /** - * Creates a new PubSubBasedAdminClientManager instance. - * - * @return A new PubSubBasedAdminClientManager instance. - */ - PubSubBasedAdminClientManager createAdminClientManager(); -} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java deleted file mode 100644 index 44c147c9c6..0000000000 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java +++ /dev/null @@ -1,53 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2023 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - *******************************************************************************/ - -package org.eclipse.hono.client.pubsub; - -import java.util.Objects; - -import com.google.api.gax.core.CredentialsProvider; - -import io.vertx.core.Vertx; -import jakarta.enterprise.context.ApplicationScoped; - -/** - * A factory to create PubSubBasedAdminClientManagerImpl instances. - */ -@ApplicationScoped -public class PubSubBasedAdminClientManagerFactoryImpl implements PubSubBasedAdminClientManagerFactory { - - private final String projectId; - private final Vertx vertx; - private final CredentialsProvider credentialsProvider; - - /** - * Creates a new PubSubBasedAdminClientManagerFactory. - * - * @param pubSubConfigProperties The Pub/Sub config properties containing the Google project ID. - * @param vertx The Vert.x instance to use. - * @param credentialsProvider The Google credentials provider to use. - * @throws NullPointerException if vertx, projectId or credentialsProvider is {@code null}. - */ - public PubSubBasedAdminClientManagerFactoryImpl(final PubSubConfigProperties pubSubConfigProperties, - final CredentialsProvider credentialsProvider, final Vertx vertx) { - Objects.requireNonNull(pubSubConfigProperties); - this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId()); - this.credentialsProvider = Objects.requireNonNull(credentialsProvider); - this.vertx = Objects.requireNonNull(vertx); - } - - @Override - public PubSubBasedAdminClientManager createAdminClientManager() { - return new PubSubBasedAdminClientManager(projectId, credentialsProvider, vertx); - } -}