From 0ab4653b743e9c8588cadbb71e9fba7ed420a6d2 Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Thu, 5 Oct 2023 13:28:55 +0200 Subject: [PATCH] Review fixes: changed code to execute calls to pubsub on worker thread plus some additional refactoring Signed-off-by: Matthias Kaemmer --- .../AbstractProtocolAdapterApplication.java | 10 +- .../PubSubBasedInternalCommandConsumer.java | 110 +++++------------- ...ubSubBasedInternalCommandConsumerTest.java | 7 +- .../pubsub/PubSubBasedAdminClientManager.java | 106 ++++++++--------- .../PubSubBasedAdminClientManagerFactory.java | 27 +++++ ...SubBasedAdminClientManagerFactoryImpl.java | 53 +++++++++ .../client/pubsub/PubSubConfigProperties.java | 4 + .../app/CommonConfigPropertiesProducer.java | 8 -- 8 files changed, 178 insertions(+), 147 deletions(-) create mode 100644 clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java create mode 100644 clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java index 468d100464..04e5d6881c 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java @@ -59,6 +59,7 @@ import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions; import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties; import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties; +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactoryImpl; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; @@ -441,6 +442,10 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { vertx, pubSubConfigProperties.getProjectId(), provider); + final var pubSubBasedAdminClientManagerFactory = new PubSubBasedAdminClientManagerFactoryImpl( + pubSubConfigProperties, + provider, + vertx); commandConsumerFactory.registerInternalCommandConsumer( (id, handlers) -> new PubSubBasedInternalCommandConsumer( @@ -451,8 +456,9 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { tenantClient, tracer, subscriberFactory, - pubSubConfigProperties.getProjectId(), - provider)); + pubSubBasedAdminClientManagerFactory, + null + )); } }, () -> LOG.error("Could not initialize Pub/Sub based internal command consumer, no Credentials Provider present.")); } diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java index 946ef3da7d..984539f8e8 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java @@ -10,6 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ + package org.eclipse.hono.client.command.pubsub; import java.net.HttpURLConnection; @@ -23,6 +24,7 @@ import org.eclipse.hono.client.command.CommandResponseSender; import org.eclipse.hono.client.command.InternalCommandConsumer; import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory; import org.eclipse.hono.client.pubsub.tracing.PubSubTracingHelper; @@ -33,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.api.gax.core.CredentialsProvider; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.pubsub.v1.PubsubMessage; @@ -61,9 +62,9 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum private final Tracer tracer; private final PubSubSubscriberFactory subscriberFactory; private final LifecycleStatus lifecycleStatus = new LifecycleStatus(); - private final PubSubBasedAdminClientManager adminClientManager; + private final PubSubBasedAdminClientManagerFactory adminClientManagerFactory; private final Vertx vertx; - private MessageReceiver receiver; + private final MessageReceiver receiver; /** * Creates a Pub/Sub based internal command consumer. @@ -75,54 +76,10 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum * @param tenantClient The client to use for retrieving tenant configuration data. * @param tracer The OpenTracing tracer. * @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages. - * @param projectId The identifier of the Google Cloud Project to connect to. - * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. - * @throws NullPointerException If any of these parameters are {@code null}. - */ - public PubSubBasedInternalCommandConsumer( - final CommandResponseSender commandResponseSender, - final Vertx vertx, - final String adapterInstanceId, - final CommandHandlers commandHandlers, - final TenantClient tenantClient, - final Tracer tracer, - final PubSubSubscriberFactory subscriberFactory, - final String projectId, - final CredentialsProvider credentialsProvider) { - Objects.requireNonNull(projectId); - Objects.requireNonNull(credentialsProvider); - this.vertx = Objects.requireNonNull(vertx); - this.commandResponseSender = Objects.requireNonNull(commandResponseSender); - this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId); - this.commandHandlers = Objects.requireNonNull(commandHandlers); - this.tenantClient = Objects.requireNonNull(tenantClient); - this.tracer = Objects.requireNonNull(tracer); - this.subscriberFactory = Objects.requireNonNull(subscriberFactory); - this.adminClientManager = new PubSubBasedAdminClientManager(projectId, credentialsProvider); - createReceiver(); - adminClientManager - .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) - .onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) - .onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver)); - } - - /** - * Creates a Pub/Sub based internal command consumer. To be used for Unittests. - * - * @param commandResponseSender The sender used to send command responses. - * @param vertx The Vert.x instance to use. - * @param adapterInstanceId The adapter instance id. - * @param commandHandlers The command handlers to choose from for handling a received command. - * @param tenantClient The client to use for retrieving tenant configuration data. - * @param tracer The OpenTracing tracer. - * @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages. - * @param adminClientManager The Pub/Sub based admin client manager to manage topics and subscriptions. + * @param adminClientManagerFactory The factory to create Pub/Sub based admin client manager to manage topics and + * subscriptions. * @param receiver The message receiver used to process the received message. - * @throws NullPointerException If any of these parameters are {@code null}. + * @throws NullPointerException If any of these parameters except receiver are {@code null}. */ public PubSubBasedInternalCommandConsumer( final CommandResponseSender commandResponseSender, @@ -132,7 +89,7 @@ public PubSubBasedInternalCommandConsumer( final TenantClient tenantClient, final Tracer tracer, final PubSubSubscriberFactory subscriberFactory, - final PubSubBasedAdminClientManager adminClientManager, + final PubSubBasedAdminClientManagerFactory adminClientManagerFactory, final MessageReceiver receiver) { this.vertx = Objects.requireNonNull(vertx); this.commandResponseSender = Objects.requireNonNull(commandResponseSender); @@ -141,16 +98,12 @@ public PubSubBasedInternalCommandConsumer( this.tenantClient = Objects.requireNonNull(tenantClient); this.tracer = Objects.requireNonNull(tracer); this.subscriberFactory = Objects.requireNonNull(subscriberFactory); - this.adminClientManager = Objects.requireNonNull(adminClientManager); - this.receiver = Objects.requireNonNull(receiver); - adminClientManager - .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) - .onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) - .onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver)); + this.adminClientManagerFactory = Objects.requireNonNull(adminClientManagerFactory); + this.receiver = receiver != null ? receiver : createReceiver(); + } + + private MessageReceiver createReceiver() { + return this::handleCommandMessage; } @Override @@ -190,22 +143,28 @@ public Future start() { } else if (!lifecycleStatus.setStarting()) { return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping")); } + final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory.createAdminClientManager(); - final String subscriptionId = PubSubMessageHelper.getTopicName( - CommandConstants.INTERNAL_COMMAND_ENDPOINT, - adapterInstanceId); - return subscriberFactory - .getOrCreateSubscriber(subscriptionId, receiver) - .subscribe(true) + return adminClientManager + .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) + .onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", + CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) + .compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, + adapterInstanceId)) + .onComplete(v -> vertx.executeBlocking(promise -> { + adminClientManager.closeAdminClients(); + promise.complete(); + })) + .onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", + CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) + .compose(s -> subscriberFactory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName( + CommandConstants.INTERNAL_COMMAND_ENDPOINT, + adapterInstanceId), receiver).subscribe(true)) .onSuccess(s -> lifecycleStatus.setStarted()) .onFailure( e -> log.warn("Error starting Internal Command Consumer for adapter {}", adapterInstanceId, e)); } - private void createReceiver() { - receiver = this::handleCommandMessage; - } - Future handleCommandMessage(final PubsubMessage message, final AckReplyConsumer consumer) { consumer.ack(); final PubSubBasedCommand command; @@ -260,13 +219,8 @@ Future handleCommandMessage(final PubsubMessage message, final AckReplyCon @Override public Future stop() { - return lifecycleStatus.runStopAttempt(() -> Future.all( - subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId), - vertx.executeBlocking(promise -> { - adminClientManager.closeAdminClients(); - promise.complete(); - }) - ).mapEmpty()); + return lifecycleStatus.runStopAttempt( + () -> subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)); } } diff --git a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java index 98b4ff625a..74faa09514 100644 --- a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java +++ b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java @@ -10,6 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ + package org.eclipse.hono.client.command.pubsub; import static org.mockito.ArgumentMatchers.any; @@ -37,6 +38,7 @@ import org.eclipse.hono.client.command.CommandResponse; import org.eclipse.hono.client.command.CommandResponseSender; import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberClient; import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory; @@ -118,6 +120,9 @@ void setUp() { .getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) .thenReturn(Future.succeededFuture(topicAndSubscription)); + final var adminClientManagerFactory = mock(PubSubBasedAdminClientManagerFactory.class); + when(adminClientManagerFactory.createAdminClientManager()).thenReturn(adminClientManager); + subscriber = mock(PubSubSubscriberClient.class); when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture()); @@ -134,7 +139,7 @@ void setUp() { tenantClient, tracer, subscriberFactory, - adminClientManager, + adminClientManagerFactory, receiver); } diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java index 77e88cc0e8..0f7471afce 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java @@ -10,18 +10,18 @@ * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ + package org.eclipse.hono.client.pubsub; import java.io.IOException; import java.util.Objects; import java.util.concurrent.TimeUnit; -import org.eclipse.hono.client.ServerErrorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; @@ -31,10 +31,10 @@ import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.vertx.core.Future; +import io.vertx.core.Vertx; /** * A Pub/Sub based admin client manager to manage topics and subscriptions. Wraps a TopicAdminClient and a @@ -51,6 +51,7 @@ public class PubSubBasedAdminClientManager { private final String projectId; private final CredentialsProvider credentialsProvider; + private final Vertx vertx; private SubscriptionAdminClient subscriptionAdminClient; private TopicAdminClient topicAdminClient; @@ -59,10 +60,14 @@ public class PubSubBasedAdminClientManager { * * @param projectId The identifier of the Google Cloud Project to connect to. * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @param vertx The Vert.x instance to use. + * @throws NullPointerException if vertx, credentialsProvider or projectId is {@code null}. */ - public PubSubBasedAdminClientManager(final String projectId, final CredentialsProvider credentialsProvider) { + PubSubBasedAdminClientManager(final String projectId, final CredentialsProvider credentialsProvider, + final Vertx vertx) { this.projectId = Objects.requireNonNull(projectId); this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + this.vertx = Objects.requireNonNull(vertx); } private Future getOrCreateTopicAdminClient() { @@ -114,32 +119,25 @@ public Future getOrCreateTopic(final String endpoint, final String prefi return getOrCreateTopicAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) .compose(client -> getTopic(topic, client) - .recover(thr -> createTopic(projectId, topic, client))); + .recover(thr -> { + if (thr instanceof NotFoundException) { + return createTopic(topic, client); + } else { + return Future.failedFuture(thr); + } + })); } private Future getTopic(final TopicName topic, final TopicAdminClient client) { - try { - return Future.succeededFuture(client.getTopic(topic).getName()); - } catch (ApiException e) { - return Future.failedFuture( - new ServerErrorException(503, String.format("Could not get topic %s.", topic.getTopic()), e)); - } + return vertx.executeBlocking(promise -> promise.tryComplete(client.getTopic(topic).getName())); } - private Future createTopic(final String projectId, final TopicName topic, final TopicAdminClient client) { - try { - final Topic createdTopic = client.createTopic(topic); - if (createdTopic == null) { - LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId); - return Future.failedFuture("Topic creation failed."); - } - LOG.debug("Topic {} created successfully.", topic); - return Future.succeededFuture(topic.getTopic()); - } catch (ApiException e) { - LOG.debug("Error creating topic {} on project {}", topic, projectId, e); - return Future.failedFuture(new ServerErrorException(503, - String.format("Topic creation failed for topic %s.", topic.getTopic()), e)); - } + private Future createTopic(final TopicName topic, final TopicAdminClient client) { + final Future createdTopic = vertx + .executeBlocking(promise -> promise.tryComplete(client.createTopic(topic).getName())); + createdTopic.onSuccess(top -> LOG.debug("Topic {} created successfully.", topic)) + .onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId)); + return createdTopic; } /** @@ -159,44 +157,36 @@ public Future getOrCreateSubscription(final String endpoint, final Strin return getOrCreateSubscriptionAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) .compose(client -> getSubscription(subscription, client) - .recover(thr -> createSubscription(projectId, subscription, topic, client))); + .recover(thr -> { + if (thr instanceof NotFoundException) { + return createSubscription(subscription, topic, client); + } else { + return Future.failedFuture(thr); + } + })); } private Future getSubscription(final SubscriptionName subscription, final SubscriptionAdminClient client) { - try { - return Future.succeededFuture(client.getSubscription(subscription).getName()); - } catch (ApiException e) { - return Future.failedFuture(new ServerErrorException(503, - String.format("Could not get subscription %s.", subscription.getSubscription()), e)); - } + return vertx.executeBlocking(promise -> promise.tryComplete(client.getSubscription(subscription).getName())); } - private Future createSubscription(final String projectId, final SubscriptionName subscription, - final TopicName topic, final SubscriptionAdminClient client) { - try { - final Subscription request = Subscription.newBuilder() - .setName(subscription.toString()) - .setTopic(topic.toString()) - .setPushConfig(PushConfig.getDefaultInstance()) - .setAckDeadlineSeconds(0) - .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) - .setExpirationPolicy(ExpirationPolicy.getDefaultInstance()) - .build(); - final Subscription createdSubscription = client.createSubscription(request); - if (createdSubscription == null) { - LOG.debug("Creating subscription failed [subscription: {}, topic: {}, project: {}]", subscription, - topic, - projectId); - return Future.failedFuture("Subscription creation failed."); - } - LOG.debug("Subscription {} created successfully.", subscription); - return Future.succeededFuture(subscription.getSubscription()); - } catch (ApiException e) { - LOG.debug("Error creating subscription {} for topic {} on project {}", subscription, topic, projectId, e); - return Future.failedFuture(new ServerErrorException(503, - String.format("Subscription creation failed for subscription %s.", subscription.getSubscription()), - e)); - } + private Future createSubscription(final SubscriptionName subscription, final TopicName topic, + final SubscriptionAdminClient client) { + final Subscription request = Subscription.newBuilder() + .setName(subscription.toString()) + .setTopic(topic.toString()) + .setPushConfig(PushConfig.getDefaultInstance()) + .setAckDeadlineSeconds(0) + .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) + .setExpirationPolicy(ExpirationPolicy.getDefaultInstance()) + .build(); + final Future createdSubscription = vertx + .executeBlocking(promise -> promise.tryComplete(client.createSubscription(request).getName())); + createdSubscription.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscription)) + .onFailure( + thr -> LOG.debug("Creating subscription failed [subscription: {}, topic: {}, project: {}]", + subscription, topic, projectId)); + return createdSubscription; } /** diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java new file mode 100644 index 0000000000..433c4808c8 --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactory.java @@ -0,0 +1,27 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.client.pubsub; + +/** + * A factory to create PubSubBasedAdminClientManager instances. + */ +public interface PubSubBasedAdminClientManagerFactory { + + /** + * Creates a new PubSubBasedAdminClientManager instance. + * + * @return A new PubSubBasedAdminClientManager instance. + */ + PubSubBasedAdminClientManager createAdminClientManager(); +} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java new file mode 100644 index 0000000000..44c147c9c6 --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManagerFactoryImpl.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.client.pubsub; + +import java.util.Objects; + +import com.google.api.gax.core.CredentialsProvider; + +import io.vertx.core.Vertx; +import jakarta.enterprise.context.ApplicationScoped; + +/** + * A factory to create PubSubBasedAdminClientManagerImpl instances. + */ +@ApplicationScoped +public class PubSubBasedAdminClientManagerFactoryImpl implements PubSubBasedAdminClientManagerFactory { + + private final String projectId; + private final Vertx vertx; + private final CredentialsProvider credentialsProvider; + + /** + * Creates a new PubSubBasedAdminClientManagerFactory. + * + * @param pubSubConfigProperties The Pub/Sub config properties containing the Google project ID. + * @param vertx The Vert.x instance to use. + * @param credentialsProvider The Google credentials provider to use. + * @throws NullPointerException if vertx, projectId or credentialsProvider is {@code null}. + */ + public PubSubBasedAdminClientManagerFactoryImpl(final PubSubConfigProperties pubSubConfigProperties, + final CredentialsProvider credentialsProvider, final Vertx vertx) { + Objects.requireNonNull(pubSubConfigProperties); + this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId()); + this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + this.vertx = Objects.requireNonNull(vertx); + } + + @Override + public PubSubBasedAdminClientManager createAdminClientManager() { + return new PubSubBasedAdminClientManager(projectId, credentialsProvider, vertx); + } +} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java index d77278561e..39e807b3a9 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java @@ -10,11 +10,15 @@ * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ + package org.eclipse.hono.client.pubsub; +import jakarta.inject.Singleton; + /** * Common configuration properties required for access to Pub/Sub. */ +@Singleton public final class PubSubConfigProperties { private String projectId = null; diff --git a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java index 531dd98d26..0afe5ed92f 100644 --- a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java +++ b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java @@ -19,8 +19,6 @@ import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions; import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties; import org.eclipse.hono.client.notification.kafka.NotificationKafkaProducerConfigProperties; -import org.eclipse.hono.client.pubsub.PubSubConfigProperties; -import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; import org.eclipse.hono.deviceregistry.service.device.AutoProvisionerConfigOptions; import org.eclipse.hono.deviceregistry.service.device.AutoProvisionerConfigProperties; @@ -49,12 +47,6 @@ ClientConfigProperties downstreamSenderProperties( return result; } - @Produces - @Singleton - PubSubConfigProperties pubSubConfigProperties(final PubSubPublisherOptions options) { - return new PubSubConfigProperties(options); - } - @Produces @Singleton MessagingKafkaProducerConfigProperties eventKafkaProducerClientOptions(