Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embrace queue APIs in Spring's JDBC message stores for polling messages #3872

Closed
raphw opened this issue Aug 8, 2022 · 17 comments
Closed

Embrace queue APIs in Spring's JDBC message stores for polling messages #3872

raphw opened this issue Aug 8, 2022 · 17 comments

Comments

@raphw
Copy link
Contributor

raphw commented Aug 8, 2022

Modern databases support much better queuing APIs than what they offered years ago, so articles like the one included in the Spring integration documentation are rather dated.

For example, Postgres offers LISTEN and NOTIFY or Oracle database change norification. These APIs would allow for a more efficient Queue usage of the JDBC queues that are offered, if a database offers it.

@raphw raphw added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Aug 8, 2022
@artembilan
Copy link
Member

Yes, we may consider that as a contribution, for example like spring-integration-postgresql and spring-integration-oracle to implement those vendor-specific features.
It might be not as a store impl, but rather just PostgresMessageChannel and perhaps message-driven channel adapter to react to notifications and emit respective data.

Contribution is welcome!

@raphw
Copy link
Contributor Author

raphw commented Aug 10, 2022

Thanks, I will look into it and try to solve this for my own application first and then try to extract it to a pull request.

@artembilan artembilan added this to the Backlog milestone Aug 10, 2022
@artembilan artembilan added in: jdbc and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Aug 10, 2022
@raphw
Copy link
Contributor Author

raphw commented Aug 13, 2022

I played around with this on Oracle and Postgres and here are some observations:

  1. The Oracle JDBC driver is rather buggy, on its newest version, the driver fails with an internal assertion error upon trying to register a change notification. With an older version, I get it to work nicely, but only if the Oracle database runs on the same host as the JVM. There is a property NTF_LOCAL_HOST to define a host name that the Oracle DB should send notifications to, but it is falsely submitted by the JDBC driver. I got around this as well by registering the change notification in a PL/SQL script that I run via JDBC from which I call back via TCP. It requires some effort to run this setup and I found it rather limiting as the Oracle DB must be able to call the JVM. In most setups with Kubernetes, it is for example not possible to callback a specific replica. Also, often it requires to change firewall setup.

  2. The official Postgres JDBC driver only supports blocking polls but I assume that would not be an issue as that represents the queue API. However, it consumes a full connection.

  3. There is an alternative Postgres JDBC driver which does support an asynchronous notification. This way, it would be possible to use a single JDBC connection for multiple queues. Unfortunately, the project seems abandoned.

  4. The same is true for the Vertx Postgres client, which uses Netty and is better maintained.

What would you consider to be in-scope? I'd suggest to aim for Postgres support only, at least for a first version.

I am further wondering if this should rather be implemented as a pollable channel or as a PublishSubscribeChannel. Both is possible, of course, I wonder what would be the more appropriate abstraction. As the standard JDBC driver for Postgres is blocking, though, I would consider the pollable channel, though.

@artembilan
Copy link
Member

Thank you, @raphw , for investigation.
For Oracle I heard about Advance Queuing, but looks like it can work via standard JMS protocol: https://docs.oracle.com/database/121/ADQUE/aq_intro.htm#ADQUE2440.
And for that part we already have a specific MessageChannel implementation: https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms-channel.

What you say for PostgreSQL and its blocking polls does not make too much difference for me with what we have so far with a QueueChannel and JdbcChannelMessageStore combination.

So, probably we come back to square one when we have everything what we can, but provides don't give us enough flexibility to extend for their features.
Therefore I fill like this one is Won't Fix and Invalid 😄

@artembilan artembilan added the status: waiting-for-reporter Needs a feedback from the reporter label Aug 15, 2022
@raphw
Copy link
Contributor Author

raphw commented Aug 15, 2022

Indeed, Oracle AQ is JMS compatible, I don't think that needs explicit support.

As for Postgres, the difference to using a QueueChannel is that the queue channel uses an explicit lock which is only released if you add a message on the same JVM. If you for example run replicas of an application, other replicas that do not add messages to the queue will poll forever without ever releasing their locks and remain stale.

The common workaround is to poll the group every X seconds, but with Postgres's notify/listen API, this is not necessary as all listening JVMs will receive information on a new message being sent to the queue. If one replica sends 1000 messages to the queue, with this approach all other replicas will immediately wake up and start processing what is the desired behavior in my case.

(I prototyped this now with a custom component and it works like a charm.)

@artembilan
Copy link
Member

I'm confused: you said in your previous comment that "Postgres JDBC driver only supports blocking polls", so we are in a situation to re-implement a PollableChannel, which is, essentially, no difference with the currently polling QueueChannel.
And you talk about notify/listen API again.
So, what do I miss from your investigation?
Doesn't look like you agreed about some out-dated libraries to rely on...

@raphw
Copy link
Contributor Author

raphw commented Aug 15, 2022

In few words: the difference is that the current solution notifies on a JVM level, the proposed solution notifies on a database level.

If JVM A started polling from a group, it would either block forever or would need to poll in intervals. Because if JVM B sent a message, there's no mechanism to push these notifications to the other JVM. LISTEN/NOTIFY on the other hand allows for an immediate reaction on JVM A if JVM B added a message.

@artembilan
Copy link
Member

I understand what you are saying and that's why it is called "polling".
Our current impl in the QueueChannel for external store is like this:

		try {
			message = doPoll();
			while (message == null && timeoutInNanos > 0) {
				timeoutInNanos = this.messageStoreNotEmpty.awaitNanos(timeoutInNanos);
				message = doPoll();
			}
		}

Just because there is no a notification hook from the DB to react on.
Therefore we have to poll in a loop.

What you describer with the LISTEN/NOTIFY is already a SubscribableChannel, but not a polling.
Not sure why we are still not on the same page...

This is your words:

As the standard JDBC driver for Postgres is blocking, though, I would consider the pollable channel, though.

That's why I want to get a clear answer from you: what we are going to implement - the polling channel which is already there, or SubscribableChannel which is going to react for some notification.

@raphw
Copy link
Contributor Author

raphw commented Aug 15, 2022

Well, the implementation I am out after is a Java Queue that I can plug into an existing API. In the current implementation of the poll method, this delegates to the code your quoting. I was out after replacing the messageStoreNotEmpty lock with a database based one. But a subscribable channel is of course also possible. I was nore trying to explain why the current API in Soring is not equivalent as you suggested.

@artembilan
Copy link
Member

Sure! It really can be a custom Queue impl which we can inject into the QueueChannel.
Or, as I understood from your explanation, a SubscribableChannel to initiate a LISTEN on one when we subscribe to this channel and NOTIFY when we send().

@raphw
Copy link
Contributor Author

raphw commented Aug 15, 2022

Yeah, that's what I am currently thinking would be the best solution. If one wanted to map this into a Java Queue, that would still be possible by a subscriber that adds to a regular queue.

I will try to make a prototype out of this!

@raphw
Copy link
Contributor Author

raphw commented Sep 2, 2022

I have created something that hooks into Spring Integration in a way that works for me. I wanted to start out with something minimal that depicts what I am aiming at without any details: https://github.com/raphw/spring-integration/tree/postgres-notification-listener

A listener like this will trigger JVM B immediately if listening for a group message send by JVM A. As things stand, the run method of PostgresChannelMessageSubscriber must be run by some executor. Listeners will then be provided with the message being received. It does not matter how the messages are added to the message table due to using a trigger on the table that invokes NOTIFY on each insert.

In a first attempt, I tried to implement the SubscribableChannel interface, but it did not really fit. LISTEN is rather expensive as it consumes a full connection, so ideally this should be run as a singleton in the Spring context and not once for each subscription. Sharing is possible as the NOTIFY sends the groupKey as its payload.

Any suggestions how this might fit into the existing landscape? I read quite some code to figure this out myself, but it's a lot of hierarchies, so I was wondering if someone who is more experienced with the code base could make a better consideration.

@artembilan
Copy link
Member

artembilan commented Sep 2, 2022

At a glance looks cool!

I don't think we need to mix poll missed and notify concerns in one class.
I see it like this:

  1. A single global PostgresChannelMessageTableSubscriber. Since you say it block the whole connection, so indeed it is better to have one per table.
  2. The notifier must emit not only NEW.GROUP_KEY, but also NEW.REGION (if that possible, of course). Different regions may have the same group key.
  3. The SubscribableChannel may implement something like PostgresListener to be added to the provided PostgresChannelMessageTableSubscriber. How - we will figure out on the way. Probably with some attributes for filtering like those group key and region.
  4. This channel may indeed implement that "missed messages" feature in its start() contract.

My point is to avoid a JdbcChannelMessageStore and MessageHandler abstractions from this new PostgresChannelMessageTableSubscriber.

WDYT?

@raphw
Copy link
Contributor Author

raphw commented Sep 2, 2022

Yeah, that sounds like a better idea as it allows the subscriber to decide on the polling.

One issue I found however was mapping back from groupKey to groupId, if one wanted to provide the latter as an argument to the subscriber.

@artembilan
Copy link
Member

Not sure what you mean. The channel must be supplied with that groupKey and it is stored as is (string) into DB.
This key has to be a contract of the PostgresListener we are going to implement on the PostgresSubscribableChannel.
We also may consider to make it based on a bean name: see BeanNameAware DI hook.

@raphw
Copy link
Contributor Author

raphw commented Sep 3, 2022

If listening to inserts via notify, only database values can be supplied to the NOTIFY call, GROUP_KEY in this case. On LISTEN, this key is available, but not the object which is normally supplied when sending messages.

I had hoped to supply offer a listener interface where region and groupId for a message were provided to the listener, but only the stringified key is available.

@raphw
Copy link
Contributor Author

raphw commented Sep 3, 2022

I redid the suggestion, maybe that's a better approach. Please have another look!

@artembilan artembilan removed the status: waiting-for-reporter Needs a feedback from the reporter label Sep 8, 2022
@artembilan artembilan modified the milestones: Backlog, 6.0.0-M5 Sep 20, 2022
artembilan pushed a commit that referenced this issue Sep 21, 2022
Fixes #3872

* Add documentation for `PostgresSubscribableChannel` and related components
* Add links, example and what's new section.
* Docs clean up
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants