Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for channel parameters #352

Merged
merged 16 commits into from
Dec 13, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
provide arbitrary methods for mqtt
  • Loading branch information
Tenischev committed Dec 11, 2023
commit 554dcee4e7aa9127fb39c3ecde818aab5cddd5d1
Original file line number Diff line number Diff line change
@@ -15,31 +15,27 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
{% if asyncapi | isProtocol('kafka') and hasPublish %}
{%- if asyncapi | isProtocol('kafka') and hasPublish %}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
{%- endif %}
{%- if asyncapi | isProtocol('mqtt') and hasPublish %}
import org.springframework.integration.mqtt.support.MqttHeaders;
{%- endif %}
{%- if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
{%- endif %}
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
{%- endif %}
{% endfor %}
import javax.annotation.processing.Generated;
import java.util.ArrayList;
import java.util.List;
@@ -48,64 +44,95 @@
@Service
public class MessageHandlerService {

{%- set anyChannelHasParameter = false %}
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class);
{% if asyncapi | isProtocol('kafka') %}
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- if channel.publish().hasMultipleMessages() %}
{%- set typeName = "Object" %}
{%- else %}
{%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- endif %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
{%- set route = channelName %}
{%- if channel.hasParameters() %}
{%- set route = route | replaceAll(".", "\\.") %}
{%- for parameterName, parameter in channel.parameters() %}
{%- set route = route | replace("{" + parameterName + "}", ".*") %}
{%- endfor %}
{%- endif %}
@KafkaListener({% if channel.hasParameters() %}topicPattern{% else %}topics{% endif %} = "{{route}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %})
public void {{channel.publish().id() | camelCase}}(@Payload {{typeName}} payload,

{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- set hasParameters = channel.hasParameters() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or hasParameters %}
{%- set methodName = channel.publish().id() | camelCase%}
{%- if channel.publish().hasMultipleMessages() %}
{%- set typeName = "Object" %}
{%- else %}
{%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- endif %}
{% set javaDoc = '' %}
{% if channel.description() or channel.publish().description() %}
{%- set javaDoc = javaDoc + '/**\n' %}
{%- for line in channel.description() | splitByLines %}
{%- set javaDoc = javaDoc + ' * ' + (line | safe) %}
{%- set javaDoc = javaDoc + '\n' %}
{%- endfor %}
{%- for line in channel.publish().description() | splitByLines %}
{%- set javaDoc = javaDoc + ' * ' + (line | safe) %}
{%- set javaDoc = javaDoc + '\n' %}
{%- endfor %}
{%- set javaDoc = javaDoc + ' */' %}
{% endif %}
{%- if asyncapi | isProtocol('kafka') %}
{%- set route = channelName %}
{%- if hasParameters %}
{%- set route = route | replaceAll(".", "\\.") %}
{%- for parameterName, parameter in channel.parameters() %}
{%- set route = route | replace("{" + parameterName + "}", ".*") %}
{%- endfor %}
{%- endif %}
{{javaDoc}}
@KafkaListener({% if hasParameters %}topicPattern{% else %}topics{% endif %} = "{{route}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %})
public void {{methodName}}(@Payload {{typeName}} payload,
@Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_MESSAGE_KEY{% else %}RECEIVED_KEY{% endif -%}) Integer key,
@Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_PARTITION_ID{% else %}RECEIVED_PARTITION{% endif -%}) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
LOGGER.info("Key: " + key + ", Payload: " + payload.toString() + ", Timestamp: " + timestamp + ", Partition: " + partition);
}
{%- endif %}
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{%- set anyChannelHasParameter = false %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or channel.hasParameters() %}
{%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
{%- if channel.hasParameters() %}
@Value("${amqp.{{- varName -}}.routingKey}")
private String {{varName}}RoutingKey;

{%- endif %}
@RabbitListener(queues = "${amqp.{{- varName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} payload, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routKey) {
{%- if channel.hasParameters() %}
List<String> parameters = decompileRoutingKey({{varName}}RoutingKey, routKey);
{{channel.publish().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {%- endfor %}payload);
{% elif asyncapi | isProtocol('amqp') %}
{%- set propertyValueName = channelName | toAmqpNeutral(hasParameters, channel.parameters()) %}
{%- if hasParameters %}
@Value("${amqp.{{- propertyValueName -}}.routingKey}")
private String {{propertyValueName}}RoutingKey;
{% endif %}
LOGGER.info("Message received from {{- varName -}} : " + payload);
{{javaDoc}}
@RabbitListener(queues = "${amqp.{{- propertyValueName -}}.queue}")
public void {{methodName}}({{typeName}} payload, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routKey) {
{%- if hasParameters %}
List<String> parameters = decompileRoutingKey({{propertyValueName}}RoutingKey, routKey);
{{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}payload);
{% endif %}
LOGGER.info("Message received from {{- propertyValueName -}} : " + payload);
}
{%- if channel.hasParameters() %}
public void {{channel.publish().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {%- endfor %}{{schemaName}} payload) {
{% if hasParameters %}
{{javaDoc}}
public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload) {
// parametrized listener
}
{%- endif %}
{%- else %}
{%- if hasParameters %}
@Value("${mqtt.topic.{{-methodName-}}}")
private String {{methodName}}Topic;

{%- endif %}
{{javaDoc}}
public void handle{{methodName | upperFirst}}(Message<?> message) {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
{%- if hasParameters %}
List<String> parameters = decodeTopic({{methodName}}Topic, topic);
{%- endif %}
{{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}({{typeName}}) message.getPayload());
}
{{javaDoc}}
public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload) {
LOGGER.info("handler {{channelName}}");
LOGGER.info(String.valueOf(payload.toString()));
}
{% endif %}
{% endif %}
{% endfor %}
{%- if anyChannelHasParameter %}
{% endif %}
{% endfor %}

{%- if anyChannelHasParameter %}
{%- if asyncapi | isProtocol('kafka') %}
{%- elif asyncapi | isProtocol('amqp') %}
private List<String> decompileRoutingKey(String pattern, String routKey) {
List<String> parameters = new ArrayList<>();
int routeKeyPossition = 0;
@@ -126,19 +153,27 @@ private List<String> decompileRoutingKey(String pattern, String routKey) {
}
return parameters;
}
{%- endif %}
{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
public void handle{{channel.publish().id() | camelCase | upperFirst}}(Message<?> message) {
LOGGER.info("handler {{channelName}}");
LOGGER.info(String.valueOf(message.getPayload().toString()));
{%- else %}
private List<String> decodeTopic(String topicPattern, String topic) {
List<String> parameters = new ArrayList<>();
int topicPossition = 0;
int patternPosition = 0;
while (topicPossition < topic.length()) {
while (topicPattern.charAt(patternPosition) == topic.charAt(topicPossition)) {
topicPossition++;
patternPosition++;
}
topicPossition++;
patternPosition += 2; // skip +
StringBuilder parameter = new StringBuilder();
while (topicPattern.charAt(patternPosition) != topic.charAt(topicPossition)) {
parameter.append(topic.charAt(topicPossition));
topicPossition++;
}
parameters.add(parameter.toString());
}
return parameters;
}
{% endif %}
{% endfor %}
{%- endif %}
{% endif %}
}