Skip to content

Commit

Permalink
general amqp routing key support
Browse files Browse the repository at this point in the history
  • Loading branch information
Tenischev committed Dec 3, 2023
1 parent b6e05c8 commit 269e1a2
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 35 deletions.
18 changes: 14 additions & 4 deletions filters/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ function replaceAll(originalStr, replacePattern, replaceString) {
}
filter.replaceAll = replaceAll;

function toTopicString(channelName, hasParameters, parameters, convertDots, replaceValue) {
function toTopicString(channelName, hasParameters, parameters, convertDots, replaceParameterValue, replaceDots = "\.") {
if (hasParameters) {
let topicName = channelName
if (convertDots) {
topicName = replaceAll(topicName, ".", "\.")
topicName = replaceAll(topicName, ".", replaceDots)
}
Object.keys(parameters).forEach(value => topicName = topicName.replace("{" + value + "}", replaceValue))
Object.keys(parameters).forEach(value => topicName = topicName.replace("{" + value + "}", replaceParameterValue))
return topicName
} else {
return channelName
Expand All @@ -214,4 +214,14 @@ function toMqttTopicString(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, false, "+")
}

filter.toMqttTopicString = toMqttTopicString
filter.toMqttTopicString = toMqttTopicString

function toAmqpNeutral(channelName, hasParameters, parameters) {
return toTopicString(_.camelCase(channelName), hasParameters, parameters, true, "", "")
}
filter.toAmqpNeutral = toAmqpNeutral

function toAmqpKey(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, true, "*")
}
filter.toAmqpKey = toAmqpKey
60 changes: 33 additions & 27 deletions partials/AmqpConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,44 @@ public class Config {


{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;
{% set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
{% if channel.binding('amqp') and channel.binding('amqp').exchange %}
@Value("${amqp.{{- varName -}}.exchange}")
private String {{varName}}Exchange;
{% endif %}

@Value("${amqp.{{- varName -}}.routingKey}")
private String {{varName}}RoutingKey;

@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% if channel.binding('amqp') and channel.binding('amqp').queue %}
@Value("${amqp.{{- varName -}}.queue}")
private String {{varName}}Queue;
{% endif %}

{% if channel.hasPublish() %}
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;
{% set name = varName | camelCase %}
{% if channel.binding('amqp') and channel.binding('amqp').exchange %}
{% if channel.binding('amqp').exchange.type and channel.binding('amqp').exchange.type !== 'default' %}{% set type = channel.binding('amqp').exchange.type | camelCase %}{% else %}{% set type = 'Topic' %}{% endif %}
{% set type = type + 'Exchange' %}
@Bean
public {{type}} {{name}}Exchange() {
return new {{type}}({{varName}}Exchange, {% if channel.binding('amqp').exchange.durable %}{{channel.binding('amqp').exchange.durable}}{% else %}true{% endif%}, {% if channel.binding('amqp').exchange.exclusive %}{{channel.binding('amqp').exchange.exclusive}}{% else %}false{% endif%});
}

{% if channel.binding('amqp') and channel.binding('amqp').queue %}
@Bean
public Binding binding{{name | upperFirst}}({{type}} {{name}}Exchange, Queue {{name}}Queue) {
return BindingBuilder.bind({{name}}Queue).to({{name}}Exchange){% if channel.binding('amqp').exchange.type !== 'fanout' %}.with({{varName}}RoutingKey){% endif %};
}
{% endif %}{% endif %}

{% if channel.binding('amqp') and channel.binding('amqp').queue %}
@Bean
public Queue {{name}}Queue() {
return new Queue({{varName}}Queue, {% if channel.binding('amqp').queue.durable %}{{channel.binding('amqp').queue.durable}}{% else %}true{% endif%}, {% if channel.binding('amqp').queue.exclusive %}{{channel.binding('amqp').queue.exclusive}}{% else %}false{% endif%}, {% if channel.binding('amqp').queue.autoDelete %}{{channel.binding('amqp').queue.autoDelete}}{% else %}false{% endif%});
}
{% endif %}
{%- endfor %}

{% endfor %}

@Bean
public ConnectionFactory connectionFactory() {
Expand All @@ -56,24 +80,6 @@ public ConnectionFactory connectionFactory() {
return connectionFactory;
}

@Bean
public Declarables exchanges() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

@Bean
public Declarables queues() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);


@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
Expand Down
8 changes: 4 additions & 4 deletions template/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ amqp:
username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %}
password:
{% for channelName, channel in asyncapi.channels() %}
{{channelName}}:
{% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %}
{% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %}
{% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %}
{{channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters())}}:
{% if channel.binding('amqp') and channel.binding('amqp').exchange %}exchange: {{channel.binding('amqp').exchange.name}}{% endif %}
routingKey: {{channelName | toAmqpKey(channel.hasParameters(), channel.parameters())}}
{% if channel.binding('amqp') and channel.binding('amqp').queue %}queue: {{channel.binding('amqp').queue.name}}{% endif %}
{% endfor %}
{% endif %}

Expand Down
121 changes: 121 additions & 0 deletions tests/mocks/amqp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
asyncapi: '2.0.0'
info:
title: Streetlights API
version: '1.0.0'
description: |
The Smartylighting Streetlights API allows you to remotely manage the city lights.
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0

servers:
production:
url: localhost
protocol: amqp
description: RabbitMQ
variables:
port:
default: '5672'
username:
default: guest


defaultContentType: application/json

channels:
smartylighting.streetlights.event.{streetlightId}.lighting.measured:
description: The topic on which measured values may be produced and consumed.
bindings:
amqp:
is: routingKey
exchange:
name: lightMeasurementExchange
durable: false
autoDelete: true
queue:
name: lightMeasurementQueue
durable: false
autoDelete: true
exclusive: true
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
publish:
summary: Inform about environmental lighting conditions of a particular streetlight.
operationId: receiveLightMeasurement
message:
$ref: '#/components/messages/lightMeasured'

smartylighting.streetlights.1.0.action.{streetlightId}.turn.on.{zoneId}:
bindings:
amqp:
is: routingKey
queue:
name: lightMeasurementQueue
durable: false
autoDelete: true
exclusive: true
parameters:
streetlightId:
$ref: '#/components/parameters/streetlightId'
zoneId:
$ref: '#/components/parameters/zoneId'
subscribe:
operationId: turnOn
message:
$ref: '#/components/messages/turnOnOff'

components:
messages:
lightMeasured:
name: lightMeasured
title: Light measured
summary: Inform about environmental lighting conditions of a particular streetlight.
payload:
$ref: "#/components/schemas/lightMeasuredPayload"
turnOnOff:
name: turnOnOff
title: Turn on/off
summary: Command a particular streetlight to turn the lights on or off.
payload:
$ref: "#/components/schemas/turnOnOffPayload"

schemas:
lightMeasuredPayload:
type: object
properties:
lumens:
type: integer
minimum: 0
description: Light intensity measured in lumens.
sentAt:
$ref: "#/components/schemas/sentAt"
turnOnOffPayload:
type: object
properties:
command:
type: string
enum:
- on
- off
description: Whether to turn on or off the light.
sentAt:
$ref: "#/components/schemas/sentAt"
sentAt:
type: string
format: date-time
description: Date and time when the message was sent.

parameters:
streetlightId:
description: The ID of the streetlight.
schema:
type: string

zoneId:
description: The ID of the streetlight.
schema:
type: object
properties:
id:
type: integer

0 comments on commit 269e1a2

Please sign in to comment.