Skip to content

Commit

Permalink
[#3549] Improved getOrCreateTopic/Subscription flow for Pub/Sub messa…
Browse files Browse the repository at this point in the history
…ging infrastructure and changed the subscription expiration to never

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Nov 2, 2023
1 parent e8c8c5c commit 0561fa0
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
Expand Down Expand Up @@ -441,6 +442,10 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
vertx,
pubSubConfigProperties.getProjectId(),
provider);
final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager(
pubSubConfigProperties,
provider,
vertx);

commandConsumerFactory.registerInternalCommandConsumer(
(id, handlers) -> new PubSubBasedInternalCommandConsumer(
Expand All @@ -451,8 +456,9 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
tenantClient,
tracer,
subscriberFactory,
pubSubConfigProperties.getProjectId(),
provider));
pubSubBasedAdminClientManager,
null
));
}
}, () -> LOG.error("Could not initialize Pub/Sub based internal command consumer, no Credentials Provider present."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hono.client.command.pubsub;

import java.net.HttpURLConnection;
Expand All @@ -33,7 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -63,7 +63,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
private final PubSubBasedAdminClientManager adminClientManager;
private final Vertx vertx;
private MessageReceiver receiver;
private final MessageReceiver receiver;

/**
* Creates a Pub/Sub based internal command consumer.
Expand All @@ -75,54 +75,10 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
* @param tenantClient The client to use for retrieving tenant configuration data.
* @param tracer The OpenTracing tracer.
* @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages.
* @param projectId The identifier of the Google Cloud Project to connect to.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service.
* @throws NullPointerException If any of these parameters are {@code null}.
*/
public PubSubBasedInternalCommandConsumer(
final CommandResponseSender commandResponseSender,
final Vertx vertx,
final String adapterInstanceId,
final CommandHandlers commandHandlers,
final TenantClient tenantClient,
final Tracer tracer,
final PubSubSubscriberFactory subscriberFactory,
final String projectId,
final CredentialsProvider credentialsProvider) {
Objects.requireNonNull(projectId);
Objects.requireNonNull(credentialsProvider);
this.vertx = Objects.requireNonNull(vertx);
this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
this.commandHandlers = Objects.requireNonNull(commandHandlers);
this.tenantClient = Objects.requireNonNull(tenantClient);
this.tracer = Objects.requireNonNull(tracer);
this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
this.adminClientManager = new PubSubBasedAdminClientManager(projectId, credentialsProvider);
createReceiver();
adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver));
}

/**
* Creates a Pub/Sub based internal command consumer. To be used for Unittests.
*
* @param commandResponseSender The sender used to send command responses.
* @param vertx The Vert.x instance to use.
* @param adapterInstanceId The adapter instance id.
* @param commandHandlers The command handlers to choose from for handling a received command.
* @param tenantClient The client to use for retrieving tenant configuration data.
* @param tracer The OpenTracing tracer.
* @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages.
* @param adminClientManager The Pub/Sub based admin client manager to manage topics and subscriptions.
* @param adminClientManager The factory to create Pub/Sub based admin client manager to manage topics and
* subscriptions.
* @param receiver The message receiver used to process the received message.
* @throws NullPointerException If any of these parameters are {@code null}.
* @throws NullPointerException If any of these parameters except receiver are {@code null}.
*/
public PubSubBasedInternalCommandConsumer(
final CommandResponseSender commandResponseSender,
Expand All @@ -142,15 +98,11 @@ public PubSubBasedInternalCommandConsumer(
this.tracer = Objects.requireNonNull(tracer);
this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
this.adminClientManager = Objects.requireNonNull(adminClientManager);
this.receiver = Objects.requireNonNull(receiver);
adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver));
this.receiver = receiver != null ? receiver : createReceiver();
}

private MessageReceiver createReceiver() {
return this::handleCommandMessage;
}

@Override
Expand Down Expand Up @@ -191,21 +143,26 @@ public Future<Void> start() {
return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping"));
}

final String subscriptionId = PubSubMessageHelper.getTopicName(
CommandConstants.INTERNAL_COMMAND_ENDPOINT,
adapterInstanceId);
return subscriberFactory
.getOrCreateSubscriber(subscriptionId, receiver)
.subscribe(true)
return adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT,
adapterInstanceId))
.onComplete(v -> vertx.executeBlocking(promise -> {
adminClientManager.closeAdminClients();
promise.complete();
}))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(s -> subscriberFactory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName(
CommandConstants.INTERNAL_COMMAND_ENDPOINT,
adapterInstanceId), receiver).subscribe(true))
.onSuccess(s -> lifecycleStatus.setStarted())
.onFailure(
e -> log.warn("Error starting Internal Command Consumer for adapter {}", adapterInstanceId, e));
}

private void createReceiver() {
receiver = this::handleCommandMessage;
}

Future<Void> handleCommandMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
consumer.ack();
final PubSubBasedCommand command;
Expand Down Expand Up @@ -260,13 +217,8 @@ Future<Void> handleCommandMessage(final PubsubMessage message, final AckReplyCon

@Override
public Future<Void> stop() {
return lifecycleStatus.runStopAttempt(() -> Future.all(
subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId),
vertx.executeBlocking(promise -> {
adminClientManager.closeAdminClients();
promise.complete();
})
).mapEmpty());
return lifecycleStatus.runStopAttempt(
() -> subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hono.client.command.pubsub;

import static org.mockito.ArgumentMatchers.any;
Expand Down
Loading

0 comments on commit 0561fa0

Please sign in to comment.