Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update aqmp config #310

Merged

Conversation

VaishnaviNandakumar
Copy link
Contributor

Description

  • This implementation makes use of the Direct Exchange type of the AMQP protocol where a routing key is used to route messages to a queue. The configuration file has been structured to include the Exchange information under the publisher binding while both the Queue and Routing Key are specified under the subscriber section.
  • Provides a functional setup of AMQP configuration that supports a publisher service and a consumer listener.
  • This configuration is flexible to maintain the exchanges, queues and routing keys of more than one channel.
  • Test config files are added under the test/user_examples folder.

Test

  1. Single Channel
    Message passed to RabbitMQ via command line.
python.exe rabbitmqadmin.exe publish exchange=lightMeasuredExchange routing_key=lightMeasuredRoutingKey payload="{ \"id\":1, \"lumens\": 20}
Message published

Output from App Listener
1

  1. Multiple Channels
    Message passed to RabbitMQ via command line.
python.exe rabbitmqadmin.exe publish exchange=lightMeasuredExchange_Streetlight1 routing_key=lightMeasuredRoutingKey_Streetlight1 payload="{ \"id\":1, \"lumens\": 25}
Message published

python.exe rabbitmqadmin.exe publish exchange=lightMeasuredExchange_Streetlight2 routing_key=lightMeasuredRoutingKey_Streetlight2 payload="{ \"id\":2, \"lumens\": 50}
Message published

Output from App Listener
2

Related issue(s)
Resolves #307

@VaishnaviNandakumar
Copy link
Contributor Author

@Tenischev Hi, please have a look. Thanks.

@Tenischev Tenischev changed the title Feat/update aqmp config feat: update aqmp config Jun 17, 2023
@Tenischev
Copy link
Member

Hi @VaishnaviNandakumar
Did you check, maybe some improvements also required in rabbitmq.yml?

@VaishnaviNandakumar
Copy link
Contributor Author

Hi @Tenischev, I am not very familiar with docker images and configs so I didn't want to touch that.

@VaishnaviNandakumar
Copy link
Contributor Author

Hi @Tenischev , any update on this?

@Tenischev
Copy link
Member

Hi @VaishnaviNandakumar ,sorry I was busy this week, but I'm remember about pr. Will check soon

Copy link
Member

@Tenischev Tenischev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for such long delay

partials/AmqpPublisher.java Show resolved Hide resolved
partials/AmqpPublisher.java Show resolved Hide resolved

{% elif asyncapi | isProtocol('amqp') %}
{% for channelName, channel in asyncapi.channels() %}
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think channel.publish().message() should be there, not subscribe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding of AMQP, the messages are published into an exchange and listens from a queue. This queue is defined under the subscriber/consumer module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a small logical/conceptual change required here. Will go through it once again and revise it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the messages are published into an exchange and listens from a queue" - agree.
But please have a look to explanation of AyncAPI publish\subscribe meaning - https://www.asyncapi.com/docs/tutorials/getting-started/hello-world

In this example, you only have one channel called hello. The sample application subscribes to this channel to receive hello {name} messages.

asyncapi: 2.6.0
info:
  title: Hello world application
  version: '0.1.0'
channels:
  hello:
    publish:
      message:
        payload:
          type: string
          pattern: '^hello .+$'

By means, if AsyncAPI define publish, that means that our generated application should be able to receive messages from the publish. And vice-versa, if AsyncAPI define subscribe, that means that our generated application should be able to sned messages to the subscribe.

The MessageHandlerService should be responsible for receiving messages to do so, listeners for the AsyncAPI publish channels should be defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I had earlier referred to this doc from which my understanding of publish/subscribe was the opposite of your explanation. That is, Publish sends messages and Subscriber receives it. Following this, I thought a listener would be most appropriate for a subscriber instead.
  2. With the upcoming v3 changes to AsyncAPI, publish/subscribe will be replaced by send/receive. Should I hold on to the current changes until then? It will require a rewrite of the template generation.

Copy link
Member

@Tenischev Tenischev Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not find the doc is opposite to what I wrote.
It is the same meaning:
image
If in AsyncAPI the publish defined, then we should generate Subscriber\Consumer\MessageHandler.

Updating to AsyncAPI v3 is a global task and need to be implemented in whole template, not a part.
I would suggest to you to finish current PR for v2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got where I went wrong. I was trying to validate the doc from a server perspective while it should've been the client instead. Sorry about that. So just to confirm, publish defines the consumer and subscribe defines the producer.
In that case for AMQP, queue and routing key should be defined under publish while exchange should come under subscribe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case for AMQP, queue and routing key should be defined under publish while exchange should come under subscribe.

Yes, this is also my understanding

Also, AMQP channel binding might be interesting for you in perspective of this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will update accordingly.

Comment on lines 21 to 24
{{channelName}}:
exchange: {{channel.publish().binding('amqp').exchange}}
queue: {{channel.subscribe().binding('amqp').queue}}
routingKey: {{channel.subscribe().binding('amqp').routingKey}}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this is a good idea to generate exchange and queue for all channels without taking into account if API really has publish\subscribe for this channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tenischev So would the best way to go about it be to put in a check to see if the publish/ subscribe has been defined for that channel and then define exchange/queues etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, of course this is not so critical for the configuration file, but it will keep it clean

Comment on lines 30 to 40

{% for channelName, channel in asyncapi.channels() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;

{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${amqp.queue.{{- channelName -}}}")
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;

{% endif %}{% endfor %}
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endfor %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this is a good idea to generate exchange and queue for all channels without taking into account if API really has publish\subscribe for this channel

{{channelName}}_Exchange,
{{channelName}}_Binding
{% else %}
{{channelName}}_Queue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here, you could use {% if not loop.last %},{% endif %} to add coma after _Binding instead of counting i

@VaishnaviNandakumar
Copy link
Contributor Author

@Tenischev Hi, sorry I missed out on these comments. Slightly packed rn but I will try to get back to resolve these comments in a week. Thanks!

Copy link
Contributor Author

@VaishnaviNandakumar VaishnaviNandakumar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the below changes in this update

  • Restructured config yaml files to fit AMQP bindings as defined here. It wasn't done as per standard before.
  • Added a routingKey key under bindings in subscriber. The previous implementation had it hard coded.
  • Removed BindingBuilder from AmqpConfig. Initially it binded all the exchanges, queues and routing keys that came under a single channel together which wasn't conceptually right.
  • Added hasPublisher() and hasSubscriber() checks.

Output Validation

  1. Single Channel
    SC - 1
    Single Channel - CMD

  2. Multiple Channels
    Multi Channels - CMD
    MC - 1
    MC - 2

@Tenischev
Copy link
Member

Tenischev commented Sep 5, 2023

@VaishnaviNandakumar, please check failed tests.
Also I see you added new API to user examples which is great, but now we a lack of example for amqp in tests/mocks/ if you find it possible, it would be very good to place small and simple amqp API in tests/mocks/ and create snapshoot test for it.
As ref you could have a look on tests/mocks/kafka.yml or tests/mocks/mqtt.yml there just one publish, one subscribe channel and simple payload component.

@VaishnaviNandakumar
Copy link
Contributor Author

@Tenischev which editor are you using? I'm working so far on IntelliJ but I am not able to run any tests. Need some help with the setup.

@Tenischev
Copy link
Member

@VaishnaviNandakumar I also use IntelliJ, but template test requires npm. Just run npm install and then npm test from the template folder, to update snapshoots npm test -- -u.
By looking to GitHub report most issues are just new empty lines or removed empty lines.
Additionally, you could find standard Java unit test that in folder template/src/test/java/com/asyncapi, but there is no special unit test for AMQP.

@VaishnaviNandakumar
Copy link
Contributor Author

@Tenischev I have updated the PR to fix the failing tests. However, I do not have the bandwidth at the moment to add more for AMQP. Can that be taken up as a separate issue instead?

@Tenischev
Copy link
Member

@VaishnaviNandakumar sure it could be done in a separate PR.

Copy link
Member

@Tenischev Tenischev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VaishnaviNandakumar there is one veeeery small typo, could you please fix and i believe we will merge it

@@ -15,6 +15,7 @@ repositories {
dependencies {
{%- if asyncapi | isProtocol('amqp') %}
implementation('org.springframework.integration:spring-integration-amqp')
implementation('org.springframework.integration:spring-integration-amqp')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's doubled?

# Conflicts:
#	template/src/main/java/com/asyncapi/service/MessageHandlerService.java
#	tests/__snapshots__/kafka.test.js.snap
#	tests/__snapshots__/mqtt.test.js.snap
#	tests/__snapshots__/oneOf.test.js.snap
#	tests/__snapshots__/parameters.test.js.snap
@Tenischev Tenischev merged commit 6c611bc into asyncapi:master Oct 7, 2023
@asyncapi-bot
Copy link
Contributor

🎉 This PR is included in version 1.3.0 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Updating code for AMQP configuration
3 participants