diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java index aa775d56..5ded80b0 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java @@ -19,7 +19,6 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -32,10 +31,8 @@ import org.apache.qpid.proton.message.Message; import org.eclipse.hono.auth.Device; import org.eclipse.hono.client.ClientErrorException; -import org.eclipse.hono.client.HonoConnection; import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ServiceInvocationException; -import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.CommandResponseMessage; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.DownstreamMessage; @@ -84,7 +81,7 @@ public abstract class AbstractMqttProtocolGateway extends AbstractVerticle { private final ClientConfigProperties amqpClientConfig; private final MqttProtocolGatewayConfig mqttGatewayConfig; - private final Map clientFactoryPerTenant = new HashMap<>(); + private final MultiTenantConnectionManager tenantConnectionManager; private MqttServer server; @@ -104,11 +101,30 @@ public abstract class AbstractMqttProtocolGateway extends AbstractVerticle { */ public AbstractMqttProtocolGateway(final ClientConfigProperties amqpClientConfig, final MqttProtocolGatewayConfig mqttGatewayConfig) { + + this(amqpClientConfig, mqttGatewayConfig, new MultiTenantConnectionManagerImpl()); + } + + /** + * This constructor is only visible for testing purposes. + * + * @param amqpClientConfig The AMQP client configuration. + * @param mqttGatewayConfig The configuration of the protocol gateway. + * @param tenantConnectionManager The tenant connection manager to be used. + * @throws NullPointerException if any of the parameters is {@code null}. + * @see AbstractMqttProtocolGateway#AbstractMqttProtocolGateway(ClientConfigProperties, MqttProtocolGatewayConfig) + */ + AbstractMqttProtocolGateway(final ClientConfigProperties amqpClientConfig, + final MqttProtocolGatewayConfig mqttGatewayConfig, + final MultiTenantConnectionManager tenantConnectionManager) { + Objects.requireNonNull(amqpClientConfig); Objects.requireNonNull(mqttGatewayConfig); + Objects.requireNonNull(tenantConnectionManager); this.amqpClientConfig = amqpClientConfig; this.mqttGatewayConfig = mqttGatewayConfig; + this.tenantConnectionManager = tenantConnectionManager; } /** @@ -355,7 +371,11 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) { : Future.succeededFuture(authenticateDevice)); authAttempt - .compose(this::connectGatewayToAmqpAdapter) + .compose(authenticatedDevice -> { + final String tenantId = authenticatedDevice.getTenantId(); + return getTenantConfig(tenantId) + .compose(config -> connectGatewayToAmqpAdapter(tenantId, config, endpoint)); + }) .onComplete(result -> { if (result.succeeded()) { registerHandlers(endpoint, authAttempt.result()); @@ -414,12 +434,10 @@ private Future authenticateWithUsernameAndPassword(final MqttEndpoint en } } - private Future connectGatewayToAmqpAdapter(final Device authenticatedDevice) { - - final String tenantId = authenticatedDevice.getTenantId(); + private Future getTenantConfig(final String tenantId) { if (amqpClientConfig.getUsername() != null && amqpClientConfig.getPassword() != null) { - return connectGatewayToAmqpAdapter(tenantId, amqpClientConfig); + return Future.succeededFuture(amqpClientConfig); } else { return provideGatewayCredentials(tenantId) .compose(credentials -> { @@ -427,43 +445,19 @@ private Future connectGatewayToAmqpAdapter(final Device authenticatedDevic tenantConfig.setUsername(credentials.getUsername()); tenantConfig.setPassword(credentials.getPassword()); - return connectGatewayToAmqpAdapter(tenantId, tenantConfig); + return Future.succeededFuture(tenantConfig); }); } } - private Future connectGatewayToAmqpAdapter(final String tenantId, final ClientConfigProperties clientConfig) { - - final AmqpAdapterClientFactory amqpAdapterClientFactory = clientFactoryPerTenant.get(tenantId); - if (amqpAdapterClientFactory != null) { - return amqpAdapterClientFactory.isConnected(clientConfig.getConnectTimeout()); - } else { - - final AmqpAdapterClientFactory factory = createTenantClientFactory(tenantId, clientConfig); - clientFactoryPerTenant.put(tenantId, factory); - - return factory.connect() - .map(con -> { - log.debug("Connected to AMQP adapter"); - return null; - }); - } - } + private Future connectGatewayToAmqpAdapter(final String tenantId, + final ClientConfigProperties clientConfig, + final MqttEndpoint endpoint) { + return tenantConnectionManager.connect(tenantId, vertx, clientConfig) + .onSuccess(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint)) + .onFailure(e -> log.info("Failed to connect to Hono [tenant-id: {}, username: {}]", tenantId, + clientConfig.getUsername())); - /** - * Returns a new {@link AmqpAdapterClientFactory} with a new AMQP connection for the given tenant. - *

- * This method is only visible for testing purposes. - * - * @param tenantId The tenant to be connected. - * @param clientConfig The client properties to use for the connection. - * @return The factory. Note that the underlying AMQP connection will not be established until - * {@link AmqpAdapterClientFactory#connect()} is invoked. - */ - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { - final HonoConnection connection = HonoConnection.newConnection(vertx, clientConfig); - return AmqpAdapterClientFactory.create(connection, tenantId); } private void registerHandlers(final MqttEndpoint endpoint, final Device authenticatedDevice) { @@ -473,21 +467,27 @@ private void registerHandlers(final MqttEndpoint endpoint, final Device authenti MqttDownstreamContext.fromPublishPacket(message, endpoint, authenticatedDevice))); final CommandSubscriptionsManager cmdSubscriptionsManager = createCommandHandler(vertx); - endpoint.closeHandler(v -> close(endpoint, cmdSubscriptionsManager)); + endpoint.closeHandler(v -> cleanupConnections(endpoint, cmdSubscriptionsManager, authenticatedDevice)); endpoint.publishAcknowledgeHandler(cmdSubscriptionsManager::handlePubAck); endpoint.subscribeHandler(msg -> onSubscribe(endpoint, authenticatedDevice, msg, cmdSubscriptionsManager)); endpoint.unsubscribeHandler(msg -> onUnsubscribe(endpoint, authenticatedDevice, msg, cmdSubscriptionsManager)); } - private void close(final MqttEndpoint endpoint, final CommandSubscriptionsManager cmdSubscriptionsManager) { + private void cleanupConnections(final MqttEndpoint endpoint, + final CommandSubscriptionsManager cmdSubscriptionsManager, + final Device authenticatedDevice) { + + log.info("closing connection to device {}", authenticatedDevice.toString()); + onDeviceConnectionClose(endpoint); cmdSubscriptionsManager.removeAllSubscriptions(); - if (endpoint.isConnected()) { - log.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier()); - endpoint.close(); - } else { - log.trace("connection to client is already closed"); + + final String tenantId = authenticatedDevice.getTenantId(); + final boolean amqpLinkClosed = tenantConnectionManager.closeEndpoint(tenantId, endpoint); + + if (amqpLinkClosed) { + log.info("closing AMQP connection for tenant [{}]", tenantId); } } @@ -585,8 +585,7 @@ private void onUploadFailure(final MqttDownstreamContext ctx, final Throwable ca } if (ctx.deviceEndpoint().isConnected()) { - log.info("closing connection to device {}", ctx.authenticatedDevice().toString()); - ctx.deviceEndpoint().close(); + ctx.deviceEndpoint().close(); // cleanupConnections() will be called by close handler } } @@ -594,7 +593,7 @@ private Future sendTelemetry(final String tenantId, final String final Map properties, final byte[] payload, final String contentType, final boolean waitForOutcome) { - return clientFactoryPerTenant.get(tenantId).getOrCreateTelemetrySender() + return tenantConnectionManager.getOrCreateTelemetrySender(tenantId) .compose(sender -> { if (waitForOutcome) { log.trace( @@ -616,7 +615,7 @@ private Future sendEvent(final String tenantId, final String dev log.trace("sending event message [tenantId: {}, deviceId: {}, contentType: {}, properties: {}]", tenantId, deviceId, contentType, properties); - return clientFactoryPerTenant.get(tenantId).getOrCreateEventSender() + return tenantConnectionManager.getOrCreateEventSender(tenantId) .compose(sender -> sender.send(deviceId, payload, contentType, properties)); } @@ -628,7 +627,7 @@ private Future sendCommandResponse(final String tenantId, final "sending command response [tenantId: {}, deviceId: {}, targetAddress: {}, correlationId: {}, status: {}, contentType: {}, properties: {}]", tenantId, deviceId, targetAddress, correlationId, status, contentType, properties); - return clientFactoryPerTenant.get(tenantId).getOrCreateCommandResponseSender() + return tenantConnectionManager.getOrCreateCommandResponseSender(tenantId) .compose(sender -> sender.sendCommandResponse(deviceId, targetAddress, correlationId, status, payload, contentType, properties)); } @@ -733,7 +732,9 @@ private void onUnsubscribe(final MqttEndpoint endpoint, final Device authenticat private Future createCommandConsumer(final MqttEndpoint endpoint, final CommandSubscriptionsManager cmdSubscriptionsManager, final Device authenticatedDevice) { - return clientFactoryPerTenant.get(authenticatedDevice.getTenantId()).createDeviceSpecificCommandConsumer( + + return tenantConnectionManager.createDeviceSpecificCommandConsumer( + authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId(), cmd -> handleCommand(endpoint, cmd, cmdSubscriptionsManager, authenticatedDevice)); } @@ -887,6 +888,9 @@ public final void stop(final Promise stopPromise) { final Promise stopTracker = Promise.promise(); beforeShutdown(stopTracker); + + tenantConnectionManager.closeAllTenants(); + stopTracker.future().onComplete(v -> { if (server != null) { server.close(stopPromise); diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java new file mode 100644 index 00000000..3f867ab7 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java @@ -0,0 +1,118 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import java.util.function.Consumer; + +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.MessageConsumer; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.eclipse.hono.client.device.amqp.CommandResponder; +import org.eclipse.hono.client.device.amqp.EventSender; +import org.eclipse.hono.client.device.amqp.TelemetrySender; +import org.eclipse.hono.config.ClientConfigProperties; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Manages connections for multiple tenants. + */ +public interface MultiTenantConnectionManager { + + /** + * Connect to Hono's AMQP adapter with the given configuration. + * + * @param tenantId The tenant to connect. + * @param vertx The Vert.x instance to use for the connection. + * @param clientConfig The configuration of the connection. + * @return a succeeded future if the connection could be established within the time frame configured with + * {@link ClientConfigProperties#getConnectTimeout()}, a failed future otherwise. + */ + Future connect(String tenantId, Vertx vertx, ClientConfigProperties clientConfig); + + /** + * Adds an MQTT endpoint for the given tenant. + * + * @param tenantId The tenant to which the endpoint belongs. + * @param mqttEndpoint The endpoint to be added. + */ + void addEndpoint(String tenantId, MqttEndpoint mqttEndpoint); + + /** + * Closes the given MQTT endpoint and if there are no other open endpoints for this tenant, it closes the + * corresponding AMQP connection. + * + * @param tenantId The tenant to which the endpoint belongs. + * @param mqttEndpoint The endpoint to be closed. + * @return {@code true} if the AMQP connection and all endpoints have been closed. + */ + boolean closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint); + + /** + * Closes all connections, MQTT connections as well as AMQP for all tenants. + */ + void closeAllTenants(); + + /** + * Gets a client for sending telemetry data to Hono's AMQP protocol adapter. + * + * @param tenantId The tenant to which the sender belongs. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future getOrCreateTelemetrySender(String tenantId); + + /** + * Gets a client for sending events to Hono's AMQP protocol adapter. + * + * @param tenantId The tenant to which the sender belongs. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future getOrCreateEventSender(String tenantId); + + /** + * Gets a client for sending command responses to Hono's AMQP protocol adapter. + * + * @param tenantId The tenant to which the sender belongs. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future getOrCreateCommandResponseSender(String tenantId); + + /** + * Creates a client for consuming commands from Hono's AMQP protocol adapter for a specific device. + * + * @param tenantId The tenant to which the sender belongs. + * @param deviceId The device to consume commands for. + * @param messageHandler The handler to invoke with every command received. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future createDeviceSpecificCommandConsumer(String tenantId, String deviceId, + Consumer messageHandler); + + /** + * Creates a client for consuming commands from Hono's AMQP protocol adapter for all devices of this tenant. + * + * @param tenantId The tenant to which the sender belongs. + * @param messageHandler The handler to invoke with every command received. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future createCommandConsumer(String tenantId, Consumer messageHandler); + +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java new file mode 100644 index 00000000..3ec95a2a --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java @@ -0,0 +1,141 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.MessageConsumer; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.eclipse.hono.client.device.amqp.CommandResponder; +import org.eclipse.hono.client.device.amqp.EventSender; +import org.eclipse.hono.client.device.amqp.TelemetrySender; +import org.eclipse.hono.config.ClientConfigProperties; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Tracks MQTT connections per tenant and closes the AMQP connection automatically when the last MQTT connection of the + * tenant is closed. + *

+ * Note: {@link #connect(String, Vertx, ClientConfigProperties)} needs to be invoked before using the instance. + */ +public class MultiTenantConnectionManagerImpl implements MultiTenantConnectionManager { + + private final Map connectionsPerTenant = new HashMap<>(); + + @Override + public Future connect(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { + + connectionsPerTenant.computeIfAbsent(tenantId, k -> new TenantConnections(k, vertx, clientConfig).connect()); + + return getTenantConnections(tenantId) + .isConnected(clientConfig.getConnectTimeout()) + .onFailure(ex -> { + final TenantConnections failedTenant = connectionsPerTenant.remove(tenantId); + if (failedTenant != null) { + failedTenant.closeAllConnections(); + } + }); + } + + @Override + public void addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { + getTenantConnections(tenantId).addEndpoint(mqttEndpoint); + } + + @Override + public boolean closeEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { + + final boolean amqpLinkClosed = getTenantConnections(tenantId).closeEndpoint(mqttEndpoint); + if (amqpLinkClosed) { + connectionsPerTenant.remove(tenantId); + } + + return amqpLinkClosed; + } + + @Override + public void closeAllTenants() { + connectionsPerTenant.forEach((k, connections) -> connections.closeAllConnections()); + connectionsPerTenant.clear(); + } + + @Override + public Future getOrCreateTelemetrySender(final String tenantId) { + try { + return getAmqpAdapterClientFactory(tenantId).getOrCreateTelemetrySender(); + } catch (Exception ex) { + return Future.failedFuture(ex); + } + } + + @Override + public Future getOrCreateEventSender(final String tenantId) { + try { + return getAmqpAdapterClientFactory(tenantId).getOrCreateEventSender(); + } catch (Exception ex) { + return Future.failedFuture(ex); + } + } + + @Override + public Future getOrCreateCommandResponseSender(final String tenantId) { + try { + return getAmqpAdapterClientFactory(tenantId).getOrCreateCommandResponseSender(); + } catch (Exception ex) { + return Future.failedFuture(ex); + } + } + + @Override + public Future createDeviceSpecificCommandConsumer(final String tenantId, final String deviceId, + final Consumer messageHandler) { + + try { + return getAmqpAdapterClientFactory(tenantId).createDeviceSpecificCommandConsumer(deviceId, messageHandler); + } catch (Exception ex) { + return Future.failedFuture(ex); + } + } + + @Override + public Future createCommandConsumer(final String tenantId, + final Consumer messageHandler) { + + try { + return getAmqpAdapterClientFactory(tenantId).createCommandConsumer(messageHandler); + } catch (Exception ex) { + return Future.failedFuture(ex); + } + } + + private TenantConnections getTenantConnections(final String tenantId) throws IllegalArgumentException { + final TenantConnections tenantConnections = connectionsPerTenant.get(tenantId); + if (tenantConnections == null) { + throw new IllegalArgumentException("tenant [" + tenantId + "] is not connected"); + } else { + return tenantConnections; + } + } + + private AmqpAdapterClientFactory getAmqpAdapterClientFactory(final String tenantId) + throws IllegalStateException, IllegalArgumentException { + return getTenantConnections(tenantId).getAmqpAdapterClientFactory(); + } +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java new file mode 100644 index 00000000..71bffca8 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java @@ -0,0 +1,115 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.hono.client.HonoConnection; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.eclipse.hono.config.ClientConfigProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Manages all connections of one tenant, MQTT connections of devices as well as the AMQP connection to Hono's AMQP + * adapter. + *

+ * Note: do not re-use an instance if it is already closed. + */ +class TenantConnections { + + // visible for testing + final List mqttEndpoints = new ArrayList<>(); + + private final AmqpAdapterClientFactory amqpAdapterClientFactory; + private final Logger log = LoggerFactory.getLogger(getClass()); + + private boolean closed = false; + + TenantConnections(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { + this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId)); + } + + TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory) { + this.amqpAdapterClientFactory = amqpAdapterClientFactory; + } + + public TenantConnections connect() { + getAmqpAdapterClientFactory().connect().onSuccess(con -> log.debug("Connected to AMQP adapter")); + return this; + } + + public void addEndpoint(final MqttEndpoint mqttEndpoint) { + checkNotClosed(); + mqttEndpoints.add(mqttEndpoint); + } + + public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) { + + closeEndpointIfConnected(mqttEndpoint); + + mqttEndpoints.remove(mqttEndpoint); + + if (mqttEndpoints.isEmpty()) { + closeThisInstance(); + } + + return closed; + } + + /** + * Closes all MQTT endpoints and the AMQP connection. + */ + public void closeAllConnections() { + log.info("closing all AMQP connections"); + + mqttEndpoints.forEach(this::closeEndpointIfConnected); + mqttEndpoints.clear(); + closeThisInstance(); + } + + private void closeEndpointIfConnected(final MqttEndpoint mqttEndpoint) { + if (mqttEndpoint.isConnected()) { + log.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier()); + mqttEndpoint.close(); + } else { + log.trace("connection to client is already closed"); + } + } + + private void closeThisInstance() { + getAmqpAdapterClientFactory().disconnect(); + closed = true; + } + + public Future isConnected(final long connectTimeout) { + return getAmqpAdapterClientFactory().isConnected(connectTimeout); + } + + public AmqpAdapterClientFactory getAmqpAdapterClientFactory() throws IllegalStateException { + checkNotClosed(); + return amqpAdapterClientFactory; + } + + private void checkNotClosed() throws IllegalStateException { + if (closed) { + throw new IllegalStateException("all connections for this tenant are already closed"); + } + } +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java index 90801ca9..4c4971cd 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java @@ -14,6 +14,8 @@ package org.eclipse.hono.gateway.sdk.mqtt2amqp; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.hono.gateway.sdk.mqtt2amqp.TestMqttProtocolGateway.GW_PASSWORD; +import static org.eclipse.hono.gateway.sdk.mqtt2amqp.TestMqttProtocolGateway.GW_USERNAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -41,7 +43,6 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.eclipse.hono.client.HonoConnection; -import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.client.device.amqp.CommandResponder; import org.eclipse.hono.client.device.amqp.EventSender; import org.eclipse.hono.client.device.amqp.TelemetrySender; @@ -96,7 +97,7 @@ public class AbstractMqttProtocolGatewayTest { private Vertx vertx; private ProtonSender protonSender; private NetServer netServer; - private AmqpAdapterClientFactory amqpAdapterClientFactory; + private MultiTenantConnectionManager tenantConnectionManager; private Consumer commandHandler; /** @@ -104,12 +105,13 @@ public class AbstractMqttProtocolGatewayTest { */ @BeforeEach public void setUp() { - amqpAdapterClientFactory = mock(AmqpAdapterClientFactory.class); netServer = mock(NetServer.class); vertx = mock(Vertx.class); protonSender = mockProtonSender(); - when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture()); + tenantConnectionManager = mock(MultiTenantConnectionManager.class); + when(tenantConnectionManager.connect(anyString(), any(), any())).thenReturn(Future.succeededFuture()); + when(tenantConnectionManager.closeEndpoint(anyString(), any())).thenReturn(true); amqpClientConfig = new ClientConfigProperties(); final HonoConnection connection = mockHonoConnection(vertx, amqpClientConfig, protonSender); @@ -117,21 +119,21 @@ public void setUp() { final Future eventSender = AmqpAdapterClientEventSenderImpl .createWithAnonymousLinkAddress(connection, TestMqttProtocolGateway.TENANT_ID, s -> { }); - when(amqpAdapterClientFactory.getOrCreateEventSender()).thenReturn(eventSender); + when(tenantConnectionManager.getOrCreateEventSender(anyString())).thenReturn(eventSender); final Future telemetrySender = AmqpAdapterClientTelemetrySenderImpl .createWithAnonymousLinkAddress(connection, TestMqttProtocolGateway.TENANT_ID, s -> { }); - when(amqpAdapterClientFactory.getOrCreateTelemetrySender()).thenReturn(telemetrySender); + when(tenantConnectionManager.getOrCreateTelemetrySender(anyString())).thenReturn(telemetrySender); final Future commandResponseSender = AmqpAdapterClientCommandResponseSender .createWithAnonymousLinkAddress(connection, TestMqttProtocolGateway.TENANT_ID, s -> { }); - when(amqpAdapterClientFactory.getOrCreateCommandResponseSender()).thenReturn(commandResponseSender); + when(tenantConnectionManager.getOrCreateCommandResponseSender(anyString())).thenReturn(commandResponseSender); - when(amqpAdapterClientFactory.createDeviceSpecificCommandConsumer(anyString(), any())) + when(tenantConnectionManager.createDeviceSpecificCommandConsumer(anyString(), anyString(), any())) .thenAnswer(invocation -> { - final Consumer msgHandler = invocation.getArgument(1); + final Consumer msgHandler = invocation.getArgument(2); setCommandHandler(msgHandler); return AmqpAdapterClientCommandConsumer.create(connection, TestMqttProtocolGateway.TENANT_ID, TestMqttProtocolGateway.DEVICE_ID, @@ -382,7 +384,7 @@ public void testConnectWithClientCertSucceeds() { // GIVEN a protocol gateway configured with a trust anchor final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future> getTrustAnchors(final List certificates) { @@ -407,7 +409,7 @@ public void testAuthenticationWithClientCertFailsIfTrustAnchorDoesNotMatch() { // GIVEN a protocol gateway configured with a trust anchor final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future> getTrustAnchors(final List certificates) { @@ -434,7 +436,8 @@ protected Future> getTrustAnchors(final List c public void testConnectFailsWhenGatewayCouldNotConnect() { // GIVEN a protocol gateway where establishing a connection to Hono's AMQP adapter fails - when(amqpAdapterClientFactory.connect()).thenReturn(Future.failedFuture("Connect failed")); + when(tenantConnectionManager.connect(anyString(), any(), any())) + .thenReturn(Future.failedFuture("Connect failed")); final TestMqttProtocolGateway gateway = createGateway(); @@ -457,26 +460,25 @@ public void testConnectWithGatewayCredentialsResolvedDynamicallySucceeds() { // ... and where the gateway credentials are resolved by the implementation final ClientConfigProperties configWithoutCredentials = new ClientConfigProperties(); final AbstractMqttProtocolGateway gateway = new TestMqttProtocolGateway(configWithoutCredentials, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager); - @Override - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { + // WHEN the gateway connects + connectTestDevice(gateway); - // THEN the AMQP connection is authenticated with the provided credentials... - assertThat(clientConfig.getUsername()).isEqualTo(GW_USERNAME); - assertThat(clientConfig.getPassword()).isEqualTo(GW_PASSWORD); + ArgumentCaptor configPropertiesArgumentCaptor = ArgumentCaptor + .forClass(ClientConfigProperties.class); - // ... and not with the credentials from the configuration - assertThat(clientConfig.getUsername()).isNotEqualTo(configWithoutCredentials.getUsername()); - assertThat(clientConfig.getPassword()).isNotEqualTo(configWithoutCredentials.getPassword()); + verify(tenantConnectionManager).connect(anyString(), any(), configPropertiesArgumentCaptor.capture()); - return super.createTenantClientFactory(tenantId, clientConfig); - } - }; + final ClientConfigProperties clientConfig = configPropertiesArgumentCaptor.getValue(); - // WHEN the gateway connects - connectTestDevice(gateway); + // THEN the AMQP connection is authenticated with the provided credentials... + assertThat(clientConfig.getUsername()).isEqualTo(GW_USERNAME); + assertThat(clientConfig.getPassword()).isEqualTo(GW_PASSWORD); + + // ... and not with the credentials from the configuration + assertThat(clientConfig.getUsername()).isNotEqualTo(configWithoutCredentials.getUsername()); + assertThat(clientConfig.getPassword()).isNotEqualTo(configWithoutCredentials.getPassword()); } @@ -496,27 +498,25 @@ public void testConfiguredCredentialsTakePrecedenceOverImplementation() { // GIVEN a protocol gateway where the AMQP config does contains credentials final AbstractMqttProtocolGateway gateway = new TestMqttProtocolGateway(configWithCredentials, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager); - @Override - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { + // WHEN the gateway connects + connectTestDevice(gateway); - // THEN the AMQP connection is authenticated with the configured credentials... - assertThat(clientConfig.getUsername()).isEqualTo(username); - assertThat(clientConfig.getPassword()).isEqualTo(password); + ArgumentCaptor configPropertiesArgumentCaptor = ArgumentCaptor + .forClass(ClientConfigProperties.class); - // ... and not with the credentials from the implementation - assertThat(clientConfig.getUsername()).isNotEqualTo(GW_USERNAME); - assertThat(clientConfig.getPassword()).isNotEqualTo(GW_PASSWORD); + verify(tenantConnectionManager).connect(anyString(), any(), configPropertiesArgumentCaptor.capture()); - return super.createTenantClientFactory(tenantId, clientConfig); - } - }; + final ClientConfigProperties clientConfig = configPropertiesArgumentCaptor.getValue(); - // WHEN the gateway connects - connectTestDevice(gateway); + // THEN the AMQP connection is authenticated with the configured credentials... + assertThat(clientConfig.getUsername()).isEqualTo(username); + assertThat(clientConfig.getPassword()).isEqualTo(password); + // ... and not with the credentials from the implementation + assertThat(clientConfig.getUsername()).isNotEqualTo(GW_USERNAME); + assertThat(clientConfig.getPassword()).isNotEqualTo(GW_PASSWORD); } /** @@ -629,7 +629,7 @@ public void testTelemetryMessage() { // GIVEN a protocol gateway that sends every MQTT publish messages as telemetry messages downstream and a // connected MQTT endpoint final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future onPublishedMessage(final MqttDownstreamContext ctx) { @@ -666,7 +666,7 @@ public void testCommandResponse() { // GIVEN a protocol gateway that sends every MQTT publish messages as command response messages downstream and a // connected MQTT endpoint final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future onPublishedMessage(final MqttDownstreamContext ctx) { @@ -963,7 +963,7 @@ private TestMqttProtocolGateway createGateway() { } private TestMqttProtocolGateway createGateway(final MqttProtocolGatewayConfig gatewayServerConfig) { - return new TestMqttProtocolGateway(amqpClientConfig, gatewayServerConfig, vertx, amqpAdapterClientFactory); + return new TestMqttProtocolGateway(amqpClientConfig, gatewayServerConfig, vertx, tenantConnectionManager); } } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java new file mode 100644 index 00000000..640ac3c8 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java @@ -0,0 +1,121 @@ +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.eclipse.hono.config.ClientConfigProperties; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Verifies behavior of {@link MultiTenantConnectionManagerImpl}. + */ +public class MultiTenantConnectionManagerImplTest { + + private static final String TENANT_ID = "test-tenant"; + private MultiTenantConnectionManagerImpl connectionManager; + private MqttEndpoint endpoint; + private Vertx vertx; + + /** + * Sets up common fixture. + */ + @BeforeEach + public void setUp() { + connectionManager = new MultiTenantConnectionManagerImpl(); + endpoint = mock(MqttEndpoint.class); + + vertx = mock(Vertx.class); + when(vertx.getOrCreateContext()).thenReturn(mock(Context.class)); + } + + /** + * Verifies that closing the last endpoint of the tenant, closes the AMQP connection. + */ + @Test + public void amqpConnectionIsClosedWhenClosingLastEndpoint() { + + connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); + connectionManager.addEndpoint(TENANT_ID, endpoint); + + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint)).isTrue(); + + } + + /** + * Verifies that closing an endpoint while there are others for the same tenant, the AMQP connection is not closed. + */ + @Test + public void amqpConnectionIsOpenWhenClosingEndpointThatIsNotTheLastOne() { + + connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); + connectionManager.addEndpoint(TENANT_ID, endpoint); + connectionManager.addEndpoint(TENANT_ID, mock(MqttEndpoint.class)); + + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint)).isFalse(); + + } + + /** + * Verifies that all tenants are closed when closeAllTenants() is invoked. + */ + @Test + public void addEndpointFailsIfInstanceIsClosed() { + + connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); + + connectionManager.closeAllTenants(); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> connectionManager.addEndpoint(TENANT_ID, endpoint)); + } + + /** + * Verifies that trying to add an endpoint without connecting the tenant first, throws an exception. + */ + @Test + public void addEndpointFailsIfNotConnected() { + Assertions.assertThrows(IllegalArgumentException.class, + () -> connectionManager.addEndpoint(TENANT_ID, endpoint)); + } + + /** + * Verifies that trying to close an endpoint without connecting the tenant first, throws an exception. + */ + @Test + public void closeEndpointFailsIfNotConnected() { + Assertions.assertThrows(IllegalArgumentException.class, + () -> connectionManager.closeEndpoint(TENANT_ID, endpoint)); + } + + /** + * Verifies that calling one of the methods that delegate to AmqpAdapterClientFactory fails if the tenant is not + * connected. + */ + @Test + public void futureFailsIfNotConnected() { + + assertThat(connectionManager.getOrCreateTelemetrySender(TENANT_ID).cause()) + .isInstanceOf(IllegalArgumentException.class); + + assertThat(connectionManager.getOrCreateEventSender(TENANT_ID).cause()) + .isInstanceOf(IllegalArgumentException.class); + + assertThat(connectionManager.getOrCreateCommandResponseSender(TENANT_ID).cause()) + .isInstanceOf(IllegalArgumentException.class); + + assertThat(connectionManager.createDeviceSpecificCommandConsumer(TENANT_ID, "device-id", msg -> { + }).cause()).isInstanceOf(IllegalArgumentException.class); + + assertThat(connectionManager.createCommandConsumer(TENANT_ID, msg -> { + }).cause()).isInstanceOf(IllegalArgumentException.class); + + } + +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java new file mode 100644 index 00000000..99840de0 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java @@ -0,0 +1,118 @@ +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.eclipse.hono.client.HonoConnection; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Future; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Verifies behavior of {@link TenantConnections}. + */ +public class TenantConnectionsTest { + + private TenantConnections tenantConnections; + private MqttEndpoint endpoint; + private AmqpAdapterClientFactory amqpAdapterClientFactory; + + /** + * Sets up common fixture. + */ + @BeforeEach + public void setUp() { + amqpAdapterClientFactory = mock(AmqpAdapterClientFactory.class); + + tenantConnections = new TenantConnections(amqpAdapterClientFactory); + endpoint = mock(MqttEndpoint.class); + } + + /** + * Verifies that the connect method returns the instance. + */ + @Test + public void connectReturnsTheInstance() { + when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture(mock(HonoConnection.class))); + + assertThat(tenantConnections.connect()).isEqualTo(tenantConnections); + } + + /** + * Verifies that adding an endpoint works. + */ + @Test + public void containsEndpointWhenAdding() { + tenantConnections.addEndpoint(endpoint); + + assertThat(tenantConnections.mqttEndpoints.size()).isEqualTo(1); + assertThat(tenantConnections.mqttEndpoints.contains(endpoint)).isTrue(); + } + + /** + * Verifies that removing an endpoint works. + */ + @Test + public void endpointIsRemovedWhenClosingEndpoint() { + tenantConnections.addEndpoint(endpoint); + + tenantConnections.closeEndpoint(endpoint); + + assertThat(tenantConnections.mqttEndpoints.isEmpty()).isTrue(); + } + + /** + * Verifies that the instance is closed when the last endpoint is closed. + */ + @Test + public void instanceIsClosedWhenClosingLastEndpoint() { + tenantConnections.addEndpoint(endpoint); + + tenantConnections.closeEndpoint(endpoint); + + Assertions.assertThrows(IllegalStateException.class, () -> tenantConnections.getAmqpAdapterClientFactory()); + } + + /** + * Verifies that the instance is NOT closed when an endpoint is closed while other endpoints are still open. + */ + @Test + public void instanceIsOpenWhenClosingEndpointThatIsNotTheLastOne() { + tenantConnections.addEndpoint(endpoint); + tenantConnections.addEndpoint(mock(MqttEndpoint.class)); + + tenantConnections.closeEndpoint(endpoint); + + assertThat(tenantConnections.getAmqpAdapterClientFactory()).isNotNull(); + } + + /** + * Verifies that the instance is closed when closeAllConnections() is invoked. + */ + @Test + public void instanceIsClosedWhenInvokingClose() { + + tenantConnections.getAmqpAdapterClientFactory(); + + tenantConnections.closeAllConnections(); + + Assertions.assertThrows(IllegalStateException.class, () -> tenantConnections.getAmqpAdapterClientFactory()); + } + + /** + * Verifies that the isConnected() method delegates the check to the client factory. + */ + @Test + public void isConnectedDelegatesToClientFactory() { + tenantConnections.isConnected(5L); + verify(amqpAdapterClientFactory).isConnected(eq(5L)); + } + +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java index 76036cd4..6831a818 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java @@ -17,14 +17,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.hono.auth.Device; -import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.config.ClientConfigProperties; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.Command; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.CommandSubscriptionsManager; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.Credentials; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.MqttCommandContext; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.MqttDownstreamContext; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.MqttProtocolGatewayConfig; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.DownstreamMessage; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.EventMessage; @@ -66,15 +59,15 @@ class TestMqttProtocolGateway extends AbstractMqttProtocolGateway { private final AtomicBoolean startupComplete = new AtomicBoolean(); private final AtomicBoolean shutdownStarted = new AtomicBoolean(); private final AtomicBoolean connectionClosed = new AtomicBoolean(); - private final AmqpAdapterClientFactory amqpAdapterClientFactory; private CommandSubscriptionsManager commandSubscriptionsManager; TestMqttProtocolGateway(final ClientConfigProperties clientConfigProperties, - final MqttProtocolGatewayConfig mqttProtocolGatewayConfig, final Vertx vertx, - final AmqpAdapterClientFactory amqpAdapterClientFactory) { - super(clientConfigProperties, mqttProtocolGatewayConfig); - this.amqpAdapterClientFactory = amqpAdapterClientFactory; + final MqttProtocolGatewayConfig mqttProtocolGatewayConfig, + final Vertx vertx, + final MultiTenantConnectionManager tenantConnectionManager) { + + super(clientConfigProperties, mqttProtocolGatewayConfig, tenantConnectionManager); super.vertx = vertx; } @@ -115,12 +108,6 @@ public CommandSubscriptionsManager getCommandSubscriptionsManager() { return commandSubscriptionsManager; } - @Override - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { - return amqpAdapterClientFactory; - } - @Override protected Future authenticateDevice(final String username, final String password, final String clientId) {