Skip to content

Commit

Permalink
provide arbitrary methods for amqp
Browse files Browse the repository at this point in the history
  • Loading branch information
Tenischev committed Dec 11, 2023
1 parent 9d101c6 commit fbcff6a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 6 deletions.
4 changes: 3 additions & 1 deletion partials/AmqpPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public interface PublisherService {
* {{line | safe}}{% endfor %}
*/{% endif %}
void {{channel.subscribe().id() | camelCase}}({{varName | upperFirst}} payload);

{%- if channel.hasParameters() %}
void {{channel.subscribe().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{varName | upperFirst}} payload);
{% endif %}
{% endif %}
{% endfor %}

Expand Down
27 changes: 27 additions & 0 deletions partials/AmqpPublisherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public class PublisherServiceImpl implements PublisherService {
{%- endif %}
{% endfor %}

{%- set anyChannelHasParameter = false %}
{% for channelName, channel in asyncapi.channels() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or channel.hasParameters() %}
{%- if channel.hasSubscribe() %}
{%- if channel.subscribe().hasMultipleMessages() %}
{%- set varName = "object" %}
Expand All @@ -46,9 +48,34 @@ public class PublisherServiceImpl implements PublisherService {
template.convertAndSend({{channelVariable}}Exchange, {{channelVariable}}RoutingKey, payload);
}

{%- if channel.hasParameters() %}
public void {{channel.subscribe().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{varName | upperFirst}} payload) {
String compiledRoutingKey = compileRoutingKey({{channelVariable}}RoutingKey, {% for parameterName, parameter in channel.parameters() %}{{parameterName}}{% if not loop.last %}, {% endif %}{%- endfor %});
template.convertAndSend({{channelVariable}}Exchange, compiledRoutingKey, payload);
}
{% endif %}

{% endif %}
{% endfor %}

{%- if anyChannelHasParameter %}
private String compileRoutingKey(String routingKeyTemplate, String... parameters) {
StringBuilder result = new StringBuilder();
int routeKeyPossition = 0;
int parametersIndex = 0;
while (routeKeyPossition < routingKeyTemplate.length()) {
while (routingKeyTemplate.charAt(routeKeyPossition) != '*') {
routeKeyPossition++;
result.append(routingKeyTemplate.charAt(routeKeyPossition));
}
routeKeyPossition++;
String parameter = parameters[parametersIndex++];
result.append(parameter != null ? parameter : "*");
}
return result.toString();
}
{%- endif %}

}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
{% if asyncapi | isProtocol('kafka') and hasPublish %}
Expand All @@ -29,6 +30,8 @@
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
Expand All @@ -38,6 +41,8 @@
{%- endfor %}
{% endif %}
import javax.annotation.processing.Generated;
import java.util.ArrayList;
import java.util.List;

@Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}")
@Service
Expand Down Expand Up @@ -74,17 +79,54 @@ public class MessageHandlerService {
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{%- set anyChannelHasParameter = false %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or channel.hasParameters() %}
{%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
{%- if channel.hasParameters() %}
@Value("${amqp.{{- varName -}}.routingKey}")
private String {{varName}}RoutingKey;

{%- endif %}
@RabbitListener(queues = "${amqp.{{- varName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} payload){
public void {{channel.publish().id() | camelCase}}({{schemaName}} payload, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routKey) {
{%- if channel.hasParameters() %}
List<String> parameters = decompileRoutingKey({{varName}}RoutingKey, routKey);
{{channel.publish().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {%- endfor %}payload);
{% endif %}
LOGGER.info("Message received from {{- varName -}} : " + payload);
}
{%- if channel.hasParameters() %}
public void {{channel.publish().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {%- endfor %}{{schemaName}} payload) {
// parametrized listener
}
{% endif %}
{% endif %}
{% endfor %}

{%- if anyChannelHasParameter %}
private List<String> decompileRoutingKey(String pattern, String routKey) {
List<String> parameters = new ArrayList<>();
int routeKeyPossition = 0;
int patternPosition = 0;
while (routeKeyPossition < routKey.length()) {
while (pattern.charAt(patternPosition) == routKey.charAt(routeKeyPossition)) {
routeKeyPossition++;
patternPosition++;
}
routeKeyPossition++;
patternPosition += 2; // skip .*
StringBuilder parameter = new StringBuilder();
while (pattern.charAt(patternPosition) != routKey.charAt(routeKeyPossition)) {
parameter.append(routKey.charAt(routeKeyPossition));
routeKeyPossition++;
}
parameters.add(parameter.toString());
}
return parameters;
}
{%- endif %}
{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
Expand Down
5 changes: 2 additions & 3 deletions tests/mocks/amqp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ channels:
bindings:
amqp:
is: routingKey
queue:
name: lightMeasurementQueue
exchange:
name: lightMeasurementExchange
durable: false
autoDelete: true
exclusive: true
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
Expand Down

0 comments on commit fbcff6a

Please sign in to comment.