Skip to content

Commit

Permalink
Merge pull request #1788 from calohmn/PR/hono_connection_add_hono_ten…
Browse files Browse the repository at this point in the history
…ant_id

#1748 Add honoTenantId configuration for HonoConnection.
  • Loading branch information
thjaeckle authored Nov 6, 2023
2 parents 23f057a + 19f9c06 commit d008959
Show file tree
Hide file tree
Showing 10 changed files with 646 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.text.MessageFormat;
import java.util.Set;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.service.config.DefaultHonoConfig;
Expand All @@ -35,8 +33,6 @@ public final class DefaultHonoConnectionFactory extends HonoConnectionFactory {

private final HonoConfig honoConfig;

private ConnectionId connectionId;

/**
* Constructs a {@code DefaultHonoConnectionFactory} for the specified arguments.
*
Expand All @@ -48,11 +44,6 @@ public DefaultHonoConnectionFactory(final ActorSystem actorSystem, final Config
honoConfig = new DefaultHonoConfig(actorSystem);
}

@Override
protected void preConversion(final Connection honoConnection) {
connectionId = honoConnection.getId();
}

@Override
public URI getBaseUri() {
return honoConfig.getBaseUri();
Expand Down Expand Up @@ -86,13 +77,13 @@ protected UserPasswordCredentials getCredentials() {
@Override
protected String resolveSourceAddress(final HonoAddressAlias honoAddressAlias) {
return MessageFormat.format("hono.{0}.{1}",
honoAddressAlias.getAliasValue(), connectionId);
honoAddressAlias.getAliasValue(), getHonoTenantId());
}

@Override
protected String resolveTargetAddress(final HonoAddressAlias honoAddressAlias) {
return MessageFormat.format("hono.{0}.{1}/'{{thing:id}}'",
honoAddressAlias.getAliasValue(), connectionId);
honoAddressAlias.getAliasValue(), getHonoTenantId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@
*/
public abstract class HonoConnectionFactory implements DittoExtensionPoint {

/**
* The name of the property in the {@code specificConfig} object containing the Hono tenant identifier.
*/
protected static final String SPEC_CONFIG_HONO_TENANT_ID = "honoTenantId";

private String honoTenantId;

/**
* Constructs a {@code HonoConnectionFactory}.
*/
Expand Down Expand Up @@ -112,6 +119,9 @@ public Connection getHonoConnection(final Connection connection) {
connection.getConnectionType())
);

honoTenantId = connection.getSpecificConfig()
.getOrDefault(SPEC_CONFIG_HONO_TENANT_ID, connection.getId().toString());

preConversion(connection);

return ConnectivityModelFactory.newConnectionBuilder(connection)
Expand All @@ -134,6 +144,18 @@ protected void preConversion(final Connection honoConnection) {
// Do nothing by default.
}

/**
* Get the Hono tenant identifier associated with the connection.
*
* @return The Hono tenant identifier.
*/
protected String getHonoTenantId() {
if (honoTenantId == null) {
throw new IllegalStateException("getHonoTenantId invoked before tenant id got determined");
}
return honoTenantId;
}

protected abstract URI getBaseUri();

protected abstract boolean isValidateCertificates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import org.assertj.core.api.Assertions;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.ReplyTarget;
Expand Down Expand Up @@ -58,7 +57,7 @@ public final class DefaultHonoConnectionFactoryTest {

private HonoConfig honoConfig;

private static Connection generateConnectionObjectFromJsonFile( String fileName) throws IOException {
private static Connection generateConnectionObjectFromJsonFile(final String fileName) throws IOException {
final var testClassLoader = DefaultHonoConnectionFactoryTest.class.getClassLoader();
try (final var connectionJsonFileStreamReader = new InputStreamReader(
testClassLoader.getResourceAsStream(fileName)
Expand All @@ -85,13 +84,26 @@ public void newInstanceWithNullActorSystemThrowsException() {
public void getHonoConnectionWithCustomMappingsReturnsExpected() throws IOException {
final var userProvidedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-test.json");
final var expectedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-expected.json");
final var expectedAdaptedConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-expected-after-adaptation.json");

final var underTest =
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedHonoConnection);
assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedAdaptedConnection);
}

@Test
public void getHonoConnectionWithImplicitTenantIdAndCustomMappingsReturnsExpected() throws IOException {
final var userProvidedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-implicit-tenant-custom-test.json");
final var expectedAdaptedConnection =
generateConnectionObjectFromJsonFile("hono-connection-implicit-tenant-custom-expected-after-adaptation.json");

final var underTest =
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedAdaptedConnection);
}

@Test
Expand All @@ -103,11 +115,11 @@ public void getHonoConnectionWithDefaultMappingReturnsExpected() throws IOExcept
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection))
.isEqualTo(getExpectedHonoConnection(userProvidedHonoConnection));
.isEqualTo(getExpectedAdaptedHonoConnection(userProvidedHonoConnection));
}

@SuppressWarnings("unchecked")
private Connection getExpectedHonoConnection(final Connection originalConnection) {
private Connection getExpectedAdaptedHonoConnection(final Connection originalConnection) {
final var sourcesByAddress = getSourcesByAddress(originalConnection.getSources());
final var commandReplyTargetHeaderMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"correlation-id", "{{ header:correlation-id }}",
Expand All @@ -122,6 +134,9 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
"subject", "{{ header:subject | fn:default(topic:action-subject) }}"
);
final var connectionId = originalConnection.getId();
final String honoTenantId = originalConnection.getSpecificConfig()
.getOrDefault(DefaultHonoConnectionFactory.SPEC_CONFIG_HONO_TENANT_ID, connectionId.toString());
final String expectedResolvedCommandTargetAddress = getExpectedResolvedCommandTargetAddress(honoTenantId);
return ConnectivityModelFactory.newConnectionBuilder(originalConnection)
.uri(honoConfig.getBaseUri().toString().replaceFirst("(\\S+://)(\\S+)",
"$1" + URLEncoder.encode(honoConfig.getUserPasswordCredentials().getUsername(), StandardCharsets.UTF_8)
Expand All @@ -135,22 +150,22 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
)
.setSources(List.of(
ConnectivityModelFactory.newSourceBuilder(sourcesByAddress.get(TELEMETRY.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(TELEMETRY, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(TELEMETRY, honoTenantId)))
.replyTarget(ReplyTarget.newBuilder()
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.headerMapping(commandReplyTargetHeaderMapping)
.build())
.build(),
ConnectivityModelFactory.newSourceBuilder(sourcesByAddress.get(EVENT.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(EVENT, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(EVENT, honoTenantId)))
.replyTarget(ReplyTarget.newBuilder()
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.headerMapping(commandReplyTargetHeaderMapping)
.build())
.build(),
ConnectivityModelFactory.newSourceBuilder(
sourcesByAddress.get(COMMAND_RESPONSE.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(COMMAND_RESPONSE, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(COMMAND_RESPONSE, honoTenantId)))
.headerMapping(ConnectivityModelFactory.newHeaderMapping(Map.of(
"correlation-id", "{{ header:correlation-id }}",
"status", "{{ header:status }}"
Expand All @@ -159,8 +174,8 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
))
.setTargets(List.of(
ConnectivityModelFactory.newTargetBuilder(targets.get(0))
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.originalAddress(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.originalAddress(expectedResolvedCommandTargetAddress)
.headerMapping(ConnectivityModelFactory.newHeaderMapping(
Stream.concat(
basicAdditionalTargetHeaderMappingEntries.entrySet().stream(),
Expand All @@ -170,8 +185,8 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
))
.build(),
ConnectivityModelFactory.newTargetBuilder(targets.get(1))
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.originalAddress(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.originalAddress(expectedResolvedCommandTargetAddress)
.headerMapping(ConnectivityModelFactory.newHeaderMapping(
basicAdditionalTargetHeaderMappingEntries
))
Expand All @@ -186,12 +201,12 @@ private static Map<String, Source> getSourcesByAddress(final Iterable<Source> so
return result;
}

private static String getExpectedResolvedSourceAddress(final HonoAddressAlias honoAddressAlias, final ConnectionId connectionId) {
return "hono." + honoAddressAlias.getAliasValue() + "." + connectionId;
private static String getExpectedResolvedSourceAddress(final HonoAddressAlias honoAddressAlias, final String honoTenantId) {
return "hono." + honoAddressAlias.getAliasValue() + "." + honoTenantId;
}

private static String getExpectedResolvedCommandTargetAddress(final ConnectionId connectionId) {
return "hono." + HonoAddressAlias.COMMAND.getAliasValue() + "." + connectionId + "/{{thing:id}}";
private static String getExpectedResolvedCommandTargetAddress(final String honoTenantId) {
return "hono." + HonoAddressAlias.COMMAND.getAliasValue() + "." + honoTenantId + "/{{thing:id}}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testConnectionTypeHono() throws IOException {
.toBuilder()
.id(connectionId)
.build();
final var expectedHonoConnection = generateConnectionObjectFromJsonFile("hono-connection-custom-expected.json", connectionId)
final var expectedHonoConnection = generateConnectionObjectFromJsonFile("hono-connection-custom-expected-after-adaptation.json", connectionId)
.toBuilder()
.id(connectionId)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"sources": [
{
"addresses": [
"hono.telemetry.test-connection-id"
"hono.telemetry.hono-tenant-id"
],
"consumerCount": 1,
"qos": 0,
Expand All @@ -32,7 +32,7 @@
"implicitStandaloneThingCreation"
],
"replyTarget": {
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"headerMapping": {
"device_id": "custom_value1",
"user_key1": "user_value1",
Expand All @@ -48,7 +48,7 @@
},
{
"addresses": [
"hono.event.test-connection-id"
"hono.event.hono-tenant-id"
],
"consumerCount": 1,
"qos": 1,
Expand All @@ -72,7 +72,7 @@
"implicitStandaloneThingCreation"
],
"replyTarget": {
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"headerMapping": {
"device_id": "{{ thing:id }}",
"subject": "custom_value2",
Expand All @@ -88,7 +88,7 @@
},
{
"addresses": [
"hono.command_response.test-connection-id"
"hono.command_response.hono-tenant-id"
],
"consumerCount": 1,
"qos": 0,
Expand Down Expand Up @@ -120,7 +120,7 @@
],
"targets": [
{
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"topics": [
"_/_/things/live/messages",
"_/_/things/live/commands"
Expand All @@ -137,7 +137,7 @@
}
},
{
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/events"
Expand All @@ -158,6 +158,7 @@
"validateCertificates": false,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id",
"saslMechanism": "plain",
"bootstrapServers": "tcp://server1:port1,tcp://server2:port2,tcp://server3:port3",
"groupId": "custom_groupId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
"validateCertificates": true,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id",
"groupId": "custom_groupId"
},
"mappingDefinitions": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id"
},
"mappingDefinitions": {
"implicitEdgeThingCreation": {
"mappingEngine": "ImplicitThingCreation",
Expand Down
Loading

0 comments on commit d008959

Please sign in to comment.