diff --git a/README.md b/README.md index a79ca31d4..ee0151668 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,14 @@ _Use your AsyncAPI definition to generate java code to subscribe and publish mes ## Usage ### AsyncAPI definitions -In order for the generator to know what names to use for some methods it's necessary to make use of [AsyncAPI specification bindings](https://www.asyncapi.com/docs/specifications/2.0.0/#operationBindingsObject). +To have correctly generated code, your AsyncAPI file MUST define `operationId` for every operation. + +In order for the generator to know what names to use for some parameters it's necessary to make use of [AsyncAPI specification bindings](https://www.asyncapi.com/docs/specifications/2.0.0/#operationBindingsObject). here is an example of how to use it: + +
Kafka +

+ ```yml channels: event.lighting.measured: @@ -38,14 +44,14 @@ info: servers: production: - url: api.streetlights.smartylighting.com:{port} - protocol: mqtt + url: kafka.bootstrap:{port} + protocol: kafka variables: port: - default: '1883' + default: '9092' enum: - - '1883' - - '8883' + - '9092' + - '9093' channels: event.lighting.measured: @@ -53,9 +59,11 @@ channels: bindings: kafka: groupId: my-group + operationId: readLightMeasurement message: $ref: '#/components/messages/lightMeasured' subscribe: + operationId: updateLightMeasurement message: $ref: '#/components/messages/lightMeasured' components: @@ -79,6 +87,127 @@ components: format: date-time description: Date and time when the message was sent. ``` + +

+
+ +
MQTT +

+ +```yml +asyncapi: '2.0.0' +info: + title: Streetlights API + version: '1.0.0' + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + +servers: + production: + url: mqtt://localhost:{port} + protocol: mqtt + description: dummy MQTT broker + bindings: + mqtt: + clientId: guest + cleanSession: false + keepAlive: 0 + lastWill: + topic: /will + qos: 0 + message: Guest gone offline. + retain: false + variables: + port: + enum: + - '8883' + - '8884' + default: '8883' + + +defaultContentType: application/json + +channels: + smartylighting/streetlights/1/0/event/{streetlightId}/lighting/measured: + description: The topic on which measured values may be produced and consumed. + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + publish: + summary: Inform about environmental lighting conditions of a particular streetlight. + operationId: receiveLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + + smartylighting/streetlights/1/0/action/{streetlightId}/turn/on: + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + subscribe: + bindings: + mqtt: + qos: 0 + retain: false + operationId: turnOn + message: + $ref: '#/components/messages/turnOnOff' + +components: + messages: + lightMeasured: + name: lightMeasured + title: Light measured + summary: Inform about environmental lighting conditions of a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + turnOnOff: + name: turnOnOff + title: Turn on/off + summary: Command a particular streetlight to turn the lights on or off. + payload: + $ref: "#/components/schemas/turnOnOffPayload" + + schemas: + lightMeasuredPayload: + type: object + properties: + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + x-pi: false + sentAt: + $ref: "#/components/schemas/sentAt" + turnOnOffPayload: + type: object + properties: + command: + type: string + enum: + - on + - off + description: Whether to turn on or off the light. + x-pi: false + sentAt: + $ref: "#/components/schemas/sentAt" + sentAt: + type: string + format: date-time + description: Date and time when the message was sent. + + parameters: + streetlightId: + description: The ID of the streetlight. + schema: + type: string +``` + +

+
+ ### From the command-line interface (CLI) ```bash @@ -98,10 +227,13 @@ components: |---|---|---|---| |disableEqualsHashCode|Disable generation of equals and hashCode methods for model classes.|No|`false`| |inverseOperations|Generate an application that will publish messages to `publish` operation of channels and read messages from `subscribe` operation of channels. Literally this flag will simply swap `publish` and `subscribe` operations in the channels.
This flag will be useful when you want to generate a code of mock for your main application. Be aware, generation could be incomplete and manual changes will be required e.g. if bindings are defined only for case of main application.|No|`false`| +|javaPackage|The Java package of the generated classes. Alternatively you can set the specification extension `info.x-java-package`. If both extension and parameter are used, parameter has more priority.|No|`com.asyncapi`| |listenerPollTimeout|Only for Kafka. Timeout in ms to use when polling the consumer.|No|`3000`| |listenerConcurrency|Only for Kafka. Number of threads to run in the listener containers.|No|`3`| +|connectionTimeout|Only for MQTT. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.|No|`30`| +|disconnectionTimeout|Only for MQTT. The completion timeout in milliseconds when disconnecting. The default disconnect completion timeout is 5000 milliseconds.|No|`5000`| +|completionTimeout|Only for MQTT. The completion timeout in milliseconds for operations. The default completion timeout is 30000 milliseconds.|No|`30000`| |asyncapiFileDir| Path where original AsyncAPI file will be stored.|No|`src/main/resources/api/`| -|javaPackage|The Java package of the generated classes. Alternatively you can set the specification extension `info.x-java-package`. If both extension and parameter are used, parameter has more priority.|No|`com.asyncapi`| #### Examples The shortest possible syntax: @@ -137,8 +269,8 @@ docker-compose -f src/main/docker/rabbitmq.yml up -d See the list of features that are still missing in the component: - [ ] support of Kafka is done based on clear "spring-kafka" library without integration like for mqtt or amqp -- [ ] generated code for protocols mqtt and amqp could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java), [MqttConfig.java](partials/MqttConfig.java) -- [ ] tests are not provided +- [ ] generated code for protocol `amqp` could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java) +- [ ] tests for protocol `amqp` are not provided - [x] add annotation to the [model generation](template/src/main/java/com/asyncapi/model). Consider "@Valid", "@JsonProperty", "@Size", "@NotNull" e.t.c. - [ ] [`parameters`](https://github.com/asyncapi/asyncapi/blob/master/versions/2.0.0/asyncapi.md#parametersObject) for topics are not supported - [ ] [`server variables`](https://github.com/asyncapi/asyncapi/blob/master/versions/2.0.0/asyncapi.md#serverVariableObject) are not entirely supported diff --git a/filters/all.js b/filters/all.js index 3a60c722e..aa6fe7828 100644 --- a/filters/all.js +++ b/filters/all.js @@ -32,6 +32,11 @@ function toJavaType(str){ } filter.toJavaType = toJavaType; +function isDefined(obj) { + return typeof obj !== 'undefined' +} +filter.isDefined = isDefined; + function isProtocol(api, protocol){ return JSON.stringify(api.json()).includes('"protocol":"' + protocol + '"'); }; diff --git a/hooks/02_removeNotRelevantParts.js b/hooks/02_removeNotRelevantParts.js index 4c99622f7..de44549ef 100644 --- a/hooks/02_removeNotRelevantParts.js +++ b/hooks/02_removeNotRelevantParts.js @@ -20,6 +20,7 @@ module.exports = { } if (!hasMqtt) { // remove filers from template related only to mqtt + fs.unlinkSync(path.resolve(generator.targetDir, 'src/test/java/com/asyncapi/TestcontainerMqttTest.java')); } } }; \ No newline at end of file diff --git a/package.json b/package.json index b1ebf59f0..ac217f832 100644 --- a/package.json +++ b/package.json @@ -93,6 +93,21 @@ "default": 3, "required": false }, + "connectionTimeout": { + "description": "Only for MQTT. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.", + "default": 30, + "required": false + }, + "disconnectionTimeout": { + "description": "Only for MQTT. The completion timeout in milliseconds when disconnecting. The default disconnect completion timeout is 5000 milliseconds.", + "default": 5000, + "required": false + }, + "completionTimeout": { + "description": "Only for MQTT. The completion timeout in milliseconds for operations. The default completion timeout is 30000 milliseconds.", + "default": 30000, + "required": false + }, "asyncapiFileDir": { "description": "Parameter of @asyncapi/generator-hooks#createAsyncapiFile, allows to specify where original AsyncAPI file will be stored.", "default": "src/main/resources/api/", diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 14b10ac45..1b203fe65 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -100,12 +100,12 @@ public RabbitTemplate rabbitTemplate() { {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} @Bean - public MessageChannel {{channelName | camelCase}}OutboundChannel() { + public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { return new DirectChannel(); } @Bean - @ServiceActivator(inputChannel = "{{channelName | camelCase}}OutboundChannel") + @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setExchangeName({{channelName}}Exchange); diff --git a/partials/CommonPublisher.java b/partials/CommonPublisher.java index 4890d7614..29df1deec 100644 --- a/partials/CommonPublisher.java +++ b/partials/CommonPublisher.java @@ -12,7 +12,7 @@ public interface PublisherService { * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - @Gateway(requestChannel = "{{channelName | camelCase}}OutboundChannel") + @Gateway(requestChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") void {{channel.subscribe().id() | camelCase}}(String data); {% endif %} {% endfor %} diff --git a/partials/MqttConfig.java b/partials/MqttConfig.java index 7606ccfe0..e9581cd5e 100644 --- a/partials/MqttConfig.java +++ b/partials/MqttConfig.java @@ -23,11 +23,20 @@ @Configuration public class Config { - @Value("${mqtt.broker.host}") - private String host; + @Value("${mqtt.broker.address}") + private String address; - @Value("${mqtt.broker.port}") - private int port; + @Value("${mqtt.broker.timeout.connection}") + private int connectionTimeout; + + @Value("${mqtt.broker.timeout.disconnection}") + private long disconnectionTimeout; + + @Value("${mqtt.broker.timeout.completion}") + private long completionTimeout; + + @Value("${mqtt.broker.clientId}") + private String clientId; @Value("${mqtt.broker.username}") private String username; @@ -35,64 +44,93 @@ public class Config { @Value("${mqtt.broker.password}") private String password; - {% for channelName, channel in asyncapi.channels() %} - @Value("${mqtt.topic.{{-channelName-}}Topic}") - private String {{channelName}}Topic; + {% for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'mqtt' and server.binding('mqtt') %} + {% if server.binding('mqtt').cleanSession | isDefined %} + @Value("${mqtt.broker.cleanSession}") + private boolean cleanSession; + {% endif %}{% if server.binding('mqtt').keepAlive | isDefined %} + @Value("${mqtt.broker.timeout.keepAlive}") + private int keepAliveInterval; + {% endif %}{% if server.binding('mqtt').lastWill %} + @Value("${mqtt.broker.lastWill.topic}") + private String lastWillTopic; + + @Value("${mqtt.broker.lastWill.message}") + private String lastWillMessage; + + @Value("${mqtt.broker.lastWill.qos}") + private int lastWillQos; + + @Value("${mqtt.broker.lastWill.retain}") + private boolean lastWillRetain; + {% endif %}{% endif %}{% endfor %} - {% endfor %} + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + @Value("${mqtt.topic.{{-channel.publish().id() | camelCase-}}}") + private String {{channel.publish().id() | camelCase-}}Topic; + {% elif channel.hasSubscribe() %} + @Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}") + private String {{channel.subscribe().id() | camelCase-}}Topic; + {% endif %}{% endfor %} @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(new String[] { host + ":" + port }); + {% for serverName, server in asyncapi.servers() %} + {% if server.protocol() == 'mqtt' and server.binding('mqtt').lastWill %}options.setWill(lastWillTopic, lastWillMessage.getBytes(), lastWillQos, lastWillRetain);{% endif %} + {% if server.protocol() == 'mqtt' and server.binding('mqtt').cleanSession | isDefined %}options.setCleanSession(cleanSession);{% endif %} + {% if server.protocol() == 'mqtt' and server.binding('mqtt').keepAlive | isDefined %}options.setKeepAliveInterval(keepAliveInterval);{% endif %}{% endfor %} + options.setServerURIs(new String[] { address }); if (!StringUtils.isEmpty(username)) { options.setUserName(username); } if (!StringUtils.isEmpty(password)) { options.setPassword(password.toCharArray()); } + options.setConnectionTimeout(connectionTimeout); factory.setConnectionOptions(options); return factory; } - // consumer - @Autowired MessageHandlerService messageHandlerService; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} @Bean - public IntegrationFlow {{channelName | camelCase}}Flow() { - return IntegrationFlows.from({{channelName | camelCase}}Inbound()) - .handle(messageHandlerService::handle{{channelName | upperFirst}}) + public IntegrationFlow {{channel.publish().id() | camelCase}}Flow() { + return IntegrationFlows.from({{channel.publish().id() | camelCase}}Inbound()) + .handle(messageHandlerService::handle{{channel.publish().id() | camelCase | upperFirst}}) .get(); } @Bean - public MessageProducerSupport {{channelName | camelCase}}Inbound() { - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("{{channelName | camelCase}}Subscriber", - mqttClientFactory(), {{channelName}}Topic); - adapter.setCompletionTimeout(5000); + public MessageProducerSupport {{channel.publish().id() | camelCase}}Inbound() { + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, + mqttClientFactory(), {{channel.publish().id() | camelCase}}Topic); + adapter.setCompletionTimeout(connectionTimeout); + adapter.setDisconnectCompletionTimeout(disconnectionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); return adapter; } {% endif %}{% endfor %} - // publisher {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - @Bean - public MessageChannel {{channelName | camelCase}}OutboundChannel() { + public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { return new DirectChannel(); } @Bean - @ServiceActivator(inputChannel = "{{channelName | camelCase}}OutboundChannel") - public MessageHandler {{channelName | camelCase}}Outbound() { - MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler("{{channelName | camelCase}}Publisher", mqttClientFactory()); + @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") + public MessageHandler {{channel.subscribe().id() | camelCase}}Outbound() { + MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); - pahoMessageHandler.setDefaultTopic({{channelName}}Topic); + pahoMessageHandler.setCompletionTimeout(completionTimeout); + pahoMessageHandler.setDisconnectCompletionTimeout(disconnectionTimeout); + pahoMessageHandler.setDefaultTopic({{channel.subscribe().id() | camelCase}}Topic); + {% if channel.subscribe().binding('mqtt') and channel.subscribe().binding('mqtt').retain | isDefined %}pahoMessageHandler.setDefaultRetained({{channel.subscribe().binding('mqtt').retain}});{% endif %} + {% if channel.subscribe().binding('mqtt') and channel.subscribe().binding('mqtt').qos | isDefined %}pahoMessageHandler.setDefaultQos({{channel.subscribe().binding('mqtt').qos}});{% endif %} return pahoMessageHandler; } {% endif %}{% endfor %} diff --git a/template/build.gradle b/template/build.gradle index 28858e194..bc85ae60d 100644 --- a/template/build.gradle +++ b/template/build.gradle @@ -23,7 +23,6 @@ dependencies { implementation('org.springframework.kafka:spring-kafka') testImplementation('org.springframework.kafka:spring-kafka-test') testImplementation('junit:junit:4.12') - testCompile('org.testcontainers:testcontainers:1.14.1') testCompile('org.testcontainers:kafka:1.14.1') {% endif -%} implementation('com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider') @@ -31,4 +30,5 @@ dependencies { implementation('javax.validation:validation-api') implementation('org.springframework.boot:spring-boot-starter-integration') testImplementation('org.springframework.boot:spring-boot-starter-test') + testCompile('org.testcontainers:testcontainers:1.14.1') } diff --git a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java index 38f7e70fc..7ba3b6fbe 100644 --- a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java +++ b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java @@ -53,7 +53,7 @@ public class MessageHandlerService { * {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - public void handle{{channelName | upperFirst}}(Message message) { + public void handle{{channel.publish().id() | camelCase | upperFirst}}(Message message) { LOGGER.info("handler {{channelName}}"); LOGGER.info(String.valueOf(message.getPayload().toString())); } diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 280a3b5ec..a1cbf536e 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -33,14 +33,28 @@ amqp: mqtt: broker: {% for line in server.description() | splitByLines %} # {{line | safe}}{% endfor %} - host: tcp://{{server.url() | replace(':{port}', '')}} - port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %} + address: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %} username: password: + {% if server.binding('mqtt') and server.binding('mqtt').clientId %}clientId: {{server.binding('mqtt').clientId}}{% endif %} + {% if server.binding('mqtt') and server.binding('mqtt').cleanSession | isDefined %}cleanSession: {{server.binding('mqtt').cleanSession}}{% endif %} + {% if server.binding('mqtt') and server.binding('mqtt').lastWill %}lastWill: + topic: {{server.binding('mqtt').lastWill.topic}} + message: {{server.binding('mqtt').lastWill.message}} + qos: {{server.binding('mqtt').lastWill.qos}} + retain: {{server.binding('mqtt').lastWill.retain}} + {% endif %} + timeout: + completion: {{params.completionTimeout}} + disconnection: {{params.disconnectionTimeout}} + connection: {{params.connectionTimeout}} + {% if server.binding('mqtt') and server.binding('mqtt').keepAlive | isDefined %}keepAlive: {{server.binding('mqtt').keepAlive}}{% endif %} topic: - {% for channelName, channel in asyncapi.channels() %} - {{channelName}}Topic: {{channelName}} - {% endfor %} + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + {{channel.publish().id() | camelCase}}: {{channelName}} + {% elif channel.hasSubscribe() %} + {{channel.subscribe().id() | camelCase}}: {{channelName}} + {% endif %}{% endfor %} {% endif %}{% endfor %} {%- if asyncapi | isProtocol('kafka') %} spring: diff --git a/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java b/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java new file mode 100644 index 000000000..6f4da6e78 --- /dev/null +++ b/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java @@ -0,0 +1,113 @@ +{%- set hasSubscribe = false -%} +{%- set hasPublish = false -%} +{%- for channelName, channel in asyncapi.channels() -%} + {%- if channel.hasPublish() -%} + {%- set hasPublish = true -%} + {%- endif -%} + {%- if channel.hasSubscribe() -%} + {%- set hasSubscribe = true -%} + {%- endif -%} +{%- endfor -%} +package {{ params['userJavaPackage'] }}; + +{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} +import {{ params['userJavaPackage'] }}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}; +{% endif %} {% endfor %} +{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %} +import {{ params['userJavaPackage'] }}.model.{{channel.publish().message().payload().uid() | camelCase | upperFirst}}; +{% endif %} {% endfor %} +{% if hasSubscribe %}import {{ params['userJavaPackage'] }}.service.PublisherService;{% endif %} +import org.eclipse.paho.client.mqttv3.*; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.testcontainers.containers.GenericContainer; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +/** + * Example of tests for mqtt based on testcontainers library + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestcontainerMqttTest { + + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + @Value("${mqtt.topic.{{-channel.publish().id() | camelCase-}}}") + private String {{channel.publish().id() | camelCase-}}Topic; + {% elif channel.hasSubscribe() %} + @Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}") + private String {{channel.subscribe().id() | camelCase-}}Topic; + {% endif %}{% endfor %} + + @ClassRule + public static GenericContainer mosquitto = new GenericContainer("eclipse-mosquitto").withExposedPorts(1883); + {% if hasSubscribe %} + @Autowired + private PublisherService publisherService; + {% endif %} + private IMqttClient publisher; + + @DynamicPropertySource + public static void mqttProperties(DynamicPropertyRegistry registry) { + String address = "tcp://" + mosquitto.getContainerIpAddress() + mosquitto.getMappedPort(1883); + registry.add("mqtt.broker.address", () -> address); + } + + @BeforeEach + public void before() throws MqttException { + String address = "tcp://" + mosquitto.getContainerIpAddress() + mosquitto.getMappedPort(1883); + publisher = new MqttClient(address, UUID.randomUUID().toString()); + publisher.connect(); + } + + @AfterEach + public void after() throws MqttException { + publisher.disconnect(); + } + + {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} + @Test + public void {{channel.subscribe().id() | camelCase}}ProducerTestcontainers() throws MqttException { + {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}(); + + List receivedMessages = new ArrayList<>(); + publisher.subscribe({{channel.subscribe().id() | camelCase-}}Topic, (topic, message) -> { + receivedMessages.add(message); + }); + + publisherService.{{channel.subscribe().id() | camelCase}}(payload.toString()); + + MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); + + assertEquals("Message is wrong", payload.toString().getBytes(), message.getPayload()); + } + {% endif %} {% if channel.hasPublish() %} + @Test + public void {{channel.publish().id() | camelCase}}ConsumerTestcontainers() throws Exception { + {{channel.publish().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.publish().message().payload().uid() | camelCase | upperFirst}}(); + + sendMessage({{channel.publish().id() | camelCase-}}Topic, payload.toString().getBytes()); + + Thread.sleep(1_000); + } + {% endif %} + {% endfor %} + {% if hasPublish %} + protected void sendMessage(String topic, byte[] message) throws Exception { + publisher.publish(topic, new MqttMessage(message)); + } + {% endif %} +}