From d4868fe520eea5b82bc65171a62dca9ac2287240 Mon Sep 17 00:00:00 2001 From: Semen Tenishchev Date: Sat, 6 Jun 2020 23:29:33 +0300 Subject: [PATCH 1/6] feat: update mqtt support --- README.md | 5 +- filters/all.js | 5 + package.json | 15 +++ partials/AmqpConfig.java | 4 +- partials/CommonPublisher.java | 2 +- partials/MqttConfig.java | 92 +++++++++++++------ .../service/MessageHandlerService.java | 2 +- template/src/main/resources/application.yml | 24 ++++- 8 files changed, 112 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index a79ca31d4..1de1563fe 100644 --- a/README.md +++ b/README.md @@ -98,10 +98,13 @@ components: |---|---|---|---| |disableEqualsHashCode|Disable generation of equals and hashCode methods for model classes.|No|`false`| |inverseOperations|Generate an application that will publish messages to `publish` operation of channels and read messages from `subscribe` operation of channels. Literally this flag will simply swap `publish` and `subscribe` operations in the channels.
This flag will be useful when you want to generate a code of mock for your main application. Be aware, generation could be incomplete and manual changes will be required e.g. if bindings are defined only for case of main application.|No|`false`| +|javaPackage|The Java package of the generated classes. Alternatively you can set the specification extension `info.x-java-package`. If both extension and parameter are used, parameter has more priority.|No|`com.asyncapi`| |listenerPollTimeout|Only for Kafka. Timeout in ms to use when polling the consumer.|No|`3000`| |listenerConcurrency|Only for Kafka. Number of threads to run in the listener containers.|No|`3`| +|connectionTimeout|Only for MQTT. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.|No|`30`| +|disconnectionTimeout|Only for MQTT. The completion timeout in milliseconds when disconnecting. The default disconnect completion timeout is 5000 milliseconds.|No|`5000`| +|completionTimeout|Only for MQTT. The completion timeout in milliseconds for operations. The default completion timeout is 30000 milliseconds.|No|`30000`| |asyncapiFileDir| Path where original AsyncAPI file will be stored.|No|`src/main/resources/api/`| -|javaPackage|The Java package of the generated classes. Alternatively you can set the specification extension `info.x-java-package`. If both extension and parameter are used, parameter has more priority.|No|`com.asyncapi`| #### Examples The shortest possible syntax: diff --git a/filters/all.js b/filters/all.js index 3a60c722e..aa6fe7828 100644 --- a/filters/all.js +++ b/filters/all.js @@ -32,6 +32,11 @@ function toJavaType(str){ } filter.toJavaType = toJavaType; +function isDefined(obj) { + return typeof obj !== 'undefined' +} +filter.isDefined = isDefined; + function isProtocol(api, protocol){ return JSON.stringify(api.json()).includes('"protocol":"' + protocol + '"'); }; diff --git a/package.json b/package.json index 63e76c7ae..56da0b72a 100644 --- a/package.json +++ b/package.json @@ -92,6 +92,21 @@ "default": 3, "required": false }, + "connectionTimeout": { + "description": "Only for MQTT. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.", + "default": 30, + "required": false + }, + "disconnectionTimeout": { + "description": "Only for MQTT. The completion timeout in milliseconds when disconnecting. The default disconnect completion timeout is 5000 milliseconds.", + "default": 5000, + "required": false + }, + "completionTimeout": { + "description": "Only for MQTT. The completion timeout in milliseconds for operations. The default completion timeout is 30000 milliseconds.", + "default": 30000, + "required": false + }, "asyncapiFileDir": { "description": "Parameter of @asyncapi/generator-hooks#createAsyncapiFile, allows to specify where original AsyncAPI file will be stored.", "default": "src/main/resources/api/", diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 14b10ac45..1b203fe65 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -100,12 +100,12 @@ public RabbitTemplate rabbitTemplate() { {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} @Bean - public MessageChannel {{channelName | camelCase}}OutboundChannel() { + public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { return new DirectChannel(); } @Bean - @ServiceActivator(inputChannel = "{{channelName | camelCase}}OutboundChannel") + @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setExchangeName({{channelName}}Exchange); diff --git a/partials/CommonPublisher.java b/partials/CommonPublisher.java index 4890d7614..29df1deec 100644 --- a/partials/CommonPublisher.java +++ b/partials/CommonPublisher.java @@ -12,7 +12,7 @@ public interface PublisherService { * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - @Gateway(requestChannel = "{{channelName | camelCase}}OutboundChannel") + @Gateway(requestChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") void {{channel.subscribe().id() | camelCase}}(String data); {% endif %} {% endfor %} diff --git a/partials/MqttConfig.java b/partials/MqttConfig.java index 7606ccfe0..6b1b05634 100644 --- a/partials/MqttConfig.java +++ b/partials/MqttConfig.java @@ -23,11 +23,20 @@ @Configuration public class Config { - @Value("${mqtt.broker.host}") - private String host; + @Value("${mqtt.broker.address}") + private String address; - @Value("${mqtt.broker.port}") - private int port; + @Value("${mqtt.broker.timeout.connection}") + private int connectionTimeout; + + @Value("${mqtt.broker.timeout.disconnection}") + private long disconnectionTimeout; + + @Value("${mqtt.broker.timeout.completion}") + private long completionTimeout; + + @Value("${mqtt.broker.clientId}") + private String clientId; @Value("${mqtt.broker.username}") private String username; @@ -35,64 +44,93 @@ public class Config { @Value("${mqtt.broker.password}") private String password; - {% for channelName, channel in asyncapi.channels() %} - @Value("${mqtt.topic.{{-channelName-}}Topic}") - private String {{channelName}}Topic; + {% for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'mqtt' and server.binding('mqtt') %} + {% if server.binding('mqtt').cleanSession | isDefined %} + @Value("${mqtt.broker.cleanSession}") + private boolean cleanSession; + {% endif %}{% if server.binding('mqtt').keepAlive | isDefined %} + @Value("${mqtt.broker.timeout.keepAlive}") + private int keepAliveInterval; + {% endif %}{% if server.binding('mqtt').lastWill %} + @Value("${mqtt.broker.lastWill.topic}") + private String lastWillTopic; - {% endfor %} + @Value("${mqtt.broker.lastWill.message}") + private String lastWillMessage; + + @Value("${mqtt.broker.lastWill.qos}") + private int lastWillQos; + + @Value("${mqtt.broker.lastWill.retain}") + private boolean lastWillRetain; + {% endif %}{% endif %}{% endfor %} + + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + @Value("${mqtt.topic.{{-channel.publish().id() | camelCase-}}}") + private String {{channel.publish().id() | camelCase-}}Topic; + {% elif channel.hasSubscribe() %} + @Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}") + private String {{channel.subscribe().id() | camelCase-}}Topic; + {% endif %}{% endfor %} @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(new String[] { host + ":" + port }); + {% for serverName, server in asyncapi.servers() %} + {% if server.protocol() == 'mqtt' and server.binding('mqtt').lastWill %}options.setWill(lastWillTopic, lastWillMessage.getBytes(), lastWillQos, lastWillRetain);{% endif %} + {% if server.protocol() == 'mqtt' and server.binding('mqtt').cleanSession | isDefined %}options.setCleanSession(cleanSession);{% endif %} + {% if server.protocol() == 'mqtt' and server.binding('mqtt').keepAlive | isDefined %}options.setKeepAliveInterval(keepAliveInterval);{% endif %}{% endfor %} + options.setServerURIs(new String[] { address }); if (!StringUtils.isEmpty(username)) { options.setUserName(username); } if (!StringUtils.isEmpty(password)) { options.setPassword(password.toCharArray()); } + options.setConnectionTimeout(connectionTimeout); factory.setConnectionOptions(options); return factory; } - // consumer - @Autowired MessageHandlerService messageHandlerService; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} // this is what i should listen @Bean - public IntegrationFlow {{channelName | camelCase}}Flow() { - return IntegrationFlows.from({{channelName | camelCase}}Inbound()) - .handle(messageHandlerService::handle{{channelName | upperFirst}}) + public IntegrationFlow {{channel.publish().id() | camelCase}}Flow() { + return IntegrationFlows.from({{channel.publish().id() | camelCase}}Inbound()) + .handle(messageHandlerService::handle{{channel.publish().id() | camelCase | upperFirst}}) .get(); } @Bean - public MessageProducerSupport {{channelName | camelCase}}Inbound() { - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("{{channelName | camelCase}}Subscriber", - mqttClientFactory(), {{channelName}}Topic); - adapter.setCompletionTimeout(5000); + public MessageProducerSupport {{channel.publish().id() | camelCase}}Inbound() { + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, + mqttClientFactory(), {{channel.publish().id() | camelCase}}Topic); + adapter.setCompletionTimeout(connectionTimeout); + adapter.setDisconnectCompletionTimeout(disconnectionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); return adapter; } {% endif %}{% endfor %} - // publisher - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} // this is where i could publish @Bean - public MessageChannel {{channelName | camelCase}}OutboundChannel() { + public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { return new DirectChannel(); } @Bean - @ServiceActivator(inputChannel = "{{channelName | camelCase}}OutboundChannel") - public MessageHandler {{channelName | camelCase}}Outbound() { - MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler("{{channelName | camelCase}}Publisher", mqttClientFactory()); + @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") + public MessageHandler {{channel.subscribe().id() | camelCase}}Outbound() { + MqttPahoMessageHandler pahoMessageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); pahoMessageHandler.setAsync(true); - pahoMessageHandler.setDefaultTopic({{channelName}}Topic); + pahoMessageHandler.setCompletionTimeout(completionTimeout); + pahoMessageHandler.setDisconnectCompletionTimeout(disconnectionTimeout); + pahoMessageHandler.setDefaultTopic({{channel.subscribe().id() | camelCase}}Topic); + {% if channel.subscribe().binding('mqtt') and channel.subscribe().binding('mqtt').retain | isDefined %}pahoMessageHandler.setDefaultRetained({{channel.subscribe().binding('mqtt').retain}});{% endif %} + {% if channel.subscribe().binding('mqtt') and channel.subscribe().binding('mqtt').qos | isDefined %}pahoMessageHandler.setDefaultQos({{channel.subscribe().binding('mqtt').qos}});{% endif %} return pahoMessageHandler; } {% endif %}{% endfor %} diff --git a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java index 38f7e70fc..7ba3b6fbe 100644 --- a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java +++ b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java @@ -53,7 +53,7 @@ public class MessageHandlerService { * {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - public void handle{{channelName | upperFirst}}(Message message) { + public void handle{{channel.publish().id() | camelCase | upperFirst}}(Message message) { LOGGER.info("handler {{channelName}}"); LOGGER.info(String.valueOf(message.getPayload().toString())); } diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 280a3b5ec..6f29dd6f1 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -33,14 +33,28 @@ amqp: mqtt: broker: {% for line in server.description() | splitByLines %} # {{line | safe}}{% endfor %} - host: tcp://{{server.url() | replace(':{port}', '')}} - port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %} + address: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %} username: password: + {% if server.binding('mqtt') and server.binding('mqtt').clientId %}clientId: {{server.binding('mqtt').clientId}}{% endif %} + {% if server.binding('mqtt') and server.binding('mqtt').cleanSession | isDefined %}cleanSession: {{server.binding('mqtt').cleanSession}}{% endif %} + {% if server.binding('mqtt') and server.binding('mqtt').lastWill %}lastWill: + topic: {{server.binding('mqtt').lastWill.topic}} + message: {{server.binding('mqtt').lastWill.message}} + qos: {{server.binding('mqtt').lastWill.qos}} + retain: {{server.binding('mqtt').lastWill.retain}} + {% endif %} + timeout: + completion: {{param.completionTimeout}} + disconnection: {{param.disconnectionTimeout}} + connection: {{param.connectionTimeout}} + {% if server.binding('mqtt') and server.binding('mqtt').keepAlive | isDefined %}keepAlive: {{server.binding('mqtt').keepAlive}}{% endif %} topic: - {% for channelName, channel in asyncapi.channels() %} - {{channelName}}Topic: {{channelName}} - {% endfor %} + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + {{channel.publish().id() | camelCase}}: {{channelName}} + {% elif channel.hasSubscribe() %} + {{channel.subscribe().id() | camelCase}}: {{channelName}} + {% endif %}{% endfor %} {% endif %}{% endfor %} {%- if asyncapi | isProtocol('kafka') %} spring: From 7c033e12d490b72755e4e0256dccad0c12ce73b2 Mon Sep 17 00:00:00 2001 From: Semen Tenishchev Date: Sun, 7 Jun 2020 19:02:07 +0300 Subject: [PATCH 2/6] doc: provide an example for README --- README.md | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 135 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 1de1563fe..4d3eff359 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,14 @@ _Use your AsyncAPI definition to generate java code to subscribe and publish mes ## Usage ### AsyncAPI definitions -In order for the generator to know what names to use for some methods it's necessary to make use of [AsyncAPI specification bindings](https://www.asyncapi.com/docs/specifications/2.0.0/#operationBindingsObject). +To have correctly generated code, your AsyncAPI file MUST define `operationId` for every operation. + +In order for the generator to know what names to use for some parameters it's necessary to make use of [AsyncAPI specification bindings](https://www.asyncapi.com/docs/specifications/2.0.0/#operationBindingsObject). here is an example of how to use it: + +
Kafka +

+ ```yml channels: event.lighting.measured: @@ -38,14 +44,14 @@ info: servers: production: - url: api.streetlights.smartylighting.com:{port} - protocol: mqtt + url: kafka.bootstrap:{port} + protocol: kafka variables: port: - default: '1883' + default: '9092' enum: - - '1883' - - '8883' + - '9092' + - '9093' channels: event.lighting.measured: @@ -53,9 +59,11 @@ channels: bindings: kafka: groupId: my-group + operationId: readLightMeasurement message: $ref: '#/components/messages/lightMeasured' subscribe: + operationId: updateLightMeasurement message: $ref: '#/components/messages/lightMeasured' components: @@ -79,6 +87,127 @@ components: format: date-time description: Date and time when the message was sent. ``` + +

+
+ +
MQTT +

+ +```yml +asyncapi: '2.0.0' +info: + title: Streetlights API + version: '1.0.0' + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + +servers: + production: + url: mqtt://localhost:{port} + protocol: mqtt + description: dummy MQTT broker + bindings: + mqtt: + clientId: guest + cleanSession: false + keepAlive: 0 + lastWill: + topic: /will + qos: 0 + message: Guest gone offline. + retain: false + variables: + port: + enum: + - '8883' + - '8884' + default: '8883' + + +defaultContentType: application/json + +channels: + smartylighting/streetlights/1/0/event/{streetlightId}/lighting/measured: + description: The topic on which measured values may be produced and consumed. + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + publish: + summary: Inform about environmental lighting conditions of a particular streetlight. + operationId: receiveLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + + smartylighting/streetlights/1/0/action/{streetlightId}/turn/on: + parameters: + streetlightId: + $ref: '#/components/parameters/streetlightId' + subscribe: + bindings: + mqtt: + qos: 0 + retain: false + operationId: turnOn + message: + $ref: '#/components/messages/turnOnOff' + +components: + messages: + lightMeasured: + name: lightMeasured + title: Light measured + summary: Inform about environmental lighting conditions of a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + turnOnOff: + name: turnOnOff + title: Turn on/off + summary: Command a particular streetlight to turn the lights on or off. + payload: + $ref: "#/components/schemas/turnOnOffPayload" + + schemas: + lightMeasuredPayload: + type: object + properties: + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + x-pi: false + sentAt: + $ref: "#/components/schemas/sentAt" + turnOnOffPayload: + type: object + properties: + command: + type: string + enum: + - on + - off + description: Whether to turn on or off the light. + x-pi: false + sentAt: + $ref: "#/components/schemas/sentAt" + sentAt: + type: string + format: date-time + description: Date and time when the message was sent. + + parameters: + streetlightId: + description: The ID of the streetlight. + schema: + type: string +``` + +

+
+ ### From the command-line interface (CLI) ```bash From 0df7098f0a202e9a2ecb59da6f0fea74eb051b29 Mon Sep 17 00:00:00 2001 From: Semen Tenishchev Date: Sun, 7 Jun 2020 23:38:16 +0300 Subject: [PATCH 3/6] feat: provide mqtt tests --- partials/MqttConfig.java | 4 +- template/src/main/resources/application.yml | 6 +- .../com/asyncapi/TestcontainerMqttTest.java | 113 ++++++++++++++++++ 3 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 template/src/test/java/com/asyncapi/TestcontainerMqttTest.java diff --git a/partials/MqttConfig.java b/partials/MqttConfig.java index 6b1b05634..e9581cd5e 100644 --- a/partials/MqttConfig.java +++ b/partials/MqttConfig.java @@ -96,7 +96,7 @@ public MqttPahoClientFactory mqttClientFactory() { @Autowired MessageHandlerService messageHandlerService; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} // this is what i should listen + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} @Bean public IntegrationFlow {{channel.publish().id() | camelCase}}Flow() { return IntegrationFlows.from({{channel.publish().id() | camelCase}}Inbound()) @@ -115,7 +115,7 @@ public MqttPahoClientFactory mqttClientFactory() { } {% endif %}{% endfor %} - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} // this is where i could publish + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} @Bean public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { return new DirectChannel(); diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 6f29dd6f1..a1cbf536e 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -45,9 +45,9 @@ mqtt: retain: {{server.binding('mqtt').lastWill.retain}} {% endif %} timeout: - completion: {{param.completionTimeout}} - disconnection: {{param.disconnectionTimeout}} - connection: {{param.connectionTimeout}} + completion: {{params.completionTimeout}} + disconnection: {{params.disconnectionTimeout}} + connection: {{params.connectionTimeout}} {% if server.binding('mqtt') and server.binding('mqtt').keepAlive | isDefined %}keepAlive: {{server.binding('mqtt').keepAlive}}{% endif %} topic: {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} diff --git a/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java b/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java new file mode 100644 index 000000000..6f4da6e78 --- /dev/null +++ b/template/src/test/java/com/asyncapi/TestcontainerMqttTest.java @@ -0,0 +1,113 @@ +{%- set hasSubscribe = false -%} +{%- set hasPublish = false -%} +{%- for channelName, channel in asyncapi.channels() -%} + {%- if channel.hasPublish() -%} + {%- set hasPublish = true -%} + {%- endif -%} + {%- if channel.hasSubscribe() -%} + {%- set hasSubscribe = true -%} + {%- endif -%} +{%- endfor -%} +package {{ params['userJavaPackage'] }}; + +{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} +import {{ params['userJavaPackage'] }}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}; +{% endif %} {% endfor %} +{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %} +import {{ params['userJavaPackage'] }}.model.{{channel.publish().message().payload().uid() | camelCase | upperFirst}}; +{% endif %} {% endfor %} +{% if hasSubscribe %}import {{ params['userJavaPackage'] }}.service.PublisherService;{% endif %} +import org.eclipse.paho.client.mqttv3.*; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.testcontainers.containers.GenericContainer; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +/** + * Example of tests for mqtt based on testcontainers library + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestcontainerMqttTest { + + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + @Value("${mqtt.topic.{{-channel.publish().id() | camelCase-}}}") + private String {{channel.publish().id() | camelCase-}}Topic; + {% elif channel.hasSubscribe() %} + @Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}") + private String {{channel.subscribe().id() | camelCase-}}Topic; + {% endif %}{% endfor %} + + @ClassRule + public static GenericContainer mosquitto = new GenericContainer("eclipse-mosquitto").withExposedPorts(1883); + {% if hasSubscribe %} + @Autowired + private PublisherService publisherService; + {% endif %} + private IMqttClient publisher; + + @DynamicPropertySource + public static void mqttProperties(DynamicPropertyRegistry registry) { + String address = "tcp://" + mosquitto.getContainerIpAddress() + mosquitto.getMappedPort(1883); + registry.add("mqtt.broker.address", () -> address); + } + + @BeforeEach + public void before() throws MqttException { + String address = "tcp://" + mosquitto.getContainerIpAddress() + mosquitto.getMappedPort(1883); + publisher = new MqttClient(address, UUID.randomUUID().toString()); + publisher.connect(); + } + + @AfterEach + public void after() throws MqttException { + publisher.disconnect(); + } + + {% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} + @Test + public void {{channel.subscribe().id() | camelCase}}ProducerTestcontainers() throws MqttException { + {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}(); + + List receivedMessages = new ArrayList<>(); + publisher.subscribe({{channel.subscribe().id() | camelCase-}}Topic, (topic, message) -> { + receivedMessages.add(message); + }); + + publisherService.{{channel.subscribe().id() | camelCase}}(payload.toString()); + + MqttMessage message = receivedMessages.get(receivedMessages.size() - 1); + + assertEquals("Message is wrong", payload.toString().getBytes(), message.getPayload()); + } + {% endif %} {% if channel.hasPublish() %} + @Test + public void {{channel.publish().id() | camelCase}}ConsumerTestcontainers() throws Exception { + {{channel.publish().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.publish().message().payload().uid() | camelCase | upperFirst}}(); + + sendMessage({{channel.publish().id() | camelCase-}}Topic, payload.toString().getBytes()); + + Thread.sleep(1_000); + } + {% endif %} + {% endfor %} + {% if hasPublish %} + protected void sendMessage(String topic, byte[] message) throws Exception { + publisher.publish(topic, new MqttMessage(message)); + } + {% endif %} +} From f513e3476017fe594bb56b893bf079ec149f1dcb Mon Sep 17 00:00:00 2001 From: Semen Tenishchev Date: Mon, 8 Jun 2020 09:33:10 +0300 Subject: [PATCH 4/6] move testcontainer lib to common section --- template/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/template/build.gradle b/template/build.gradle index 28858e194..bc85ae60d 100644 --- a/template/build.gradle +++ b/template/build.gradle @@ -23,7 +23,6 @@ dependencies { implementation('org.springframework.kafka:spring-kafka') testImplementation('org.springframework.kafka:spring-kafka-test') testImplementation('junit:junit:4.12') - testCompile('org.testcontainers:testcontainers:1.14.1') testCompile('org.testcontainers:kafka:1.14.1') {% endif -%} implementation('com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider') @@ -31,4 +30,5 @@ dependencies { implementation('javax.validation:validation-api') implementation('org.springframework.boot:spring-boot-starter-integration') testImplementation('org.springframework.boot:spring-boot-starter-test') + testCompile('org.testcontainers:testcontainers:1.14.1') } From 24168e1d75b196074491c218714b3ee23ee42c1f Mon Sep 17 00:00:00 2001 From: Semen Tenishchev Date: Tue, 16 Jun 2020 22:34:59 +0300 Subject: [PATCH 5/6] add removal of mqtt test if no such protocol in AsyncAPI --- hooks/removeNotRelevantParts.js | 1 + 1 file changed, 1 insertion(+) diff --git a/hooks/removeNotRelevantParts.js b/hooks/removeNotRelevantParts.js index 4c99622f7..de44549ef 100644 --- a/hooks/removeNotRelevantParts.js +++ b/hooks/removeNotRelevantParts.js @@ -20,6 +20,7 @@ module.exports = { } if (!hasMqtt) { // remove filers from template related only to mqtt + fs.unlinkSync(path.resolve(generator.targetDir, 'src/test/java/com/asyncapi/TestcontainerMqttTest.java')); } } }; \ No newline at end of file From e02f255733e36593e4b75123e1bd701e266d2d12 Mon Sep 17 00:00:00 2001 From: Semen Tenishchev Date: Wed, 17 Jun 2020 10:51:11 +0300 Subject: [PATCH 6/6] update Missing features list --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4d3eff359..ee0151668 100644 --- a/README.md +++ b/README.md @@ -269,8 +269,8 @@ docker-compose -f src/main/docker/rabbitmq.yml up -d See the list of features that are still missing in the component: - [ ] support of Kafka is done based on clear "spring-kafka" library without integration like for mqtt or amqp -- [ ] generated code for protocols mqtt and amqp could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java), [MqttConfig.java](partials/MqttConfig.java) -- [ ] tests are not provided +- [ ] generated code for protocol `amqp` could be out of date. Please have a look to [application.yaml](template/src/main/resources/application.yml) and [AmqpConfig.java](partials/AmqpConfig.java) +- [ ] tests for protocol `amqp` are not provided - [x] add annotation to the [model generation](template/src/main/java/com/asyncapi/model). Consider "@Valid", "@JsonProperty", "@Size", "@NotNull" e.t.c. - [ ] [`parameters`](https://github.com/asyncapi/asyncapi/blob/master/versions/2.0.0/asyncapi.md#parametersObject) for topics are not supported - [ ] [`server variables`](https://github.com/asyncapi/asyncapi/blob/master/versions/2.0.0/asyncapi.md#serverVariableObject) are not entirely supported