Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Jakarta Messaging, #2989 #3043

Merged
merged 10 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy val alpakka = project
influxdb,
ironmq,
jms,
jakartaJms,
ennru marked this conversation as resolved.
Show resolved Hide resolved
jsonStreaming,
kinesis,
kudu,
Expand Down Expand Up @@ -277,6 +278,9 @@ lazy val ironmq = alpakkaProject(

lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, Scala3.settings)

lazy val jakartaJms = alpakkaProject("jakarta-jms", "jakarta-jms", Dependencies.JakartaJms, Scala3.settings)
.settings(mimaPreviousArtifacts := Set.empty) // FIXME remove after first release

lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming)

lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis).settings(Scala3.settings)
Expand Down Expand Up @@ -427,6 +431,8 @@ lazy val docs = project
"javadoc.java.base_url" -> "https://docs.oracle.com/en/java/javase/11/docs/api/java.base/",
"javadoc.java.link_style" -> "direct",
"javadoc.javax.jms.base_url" -> "https://docs.oracle.com/javaee/7/api/",
"javadoc.jakarta.jms.base_url" -> "https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/",
"javadoc.jakarta.jms.link_style" -> "direct",
"javadoc.com.couchbase.base_url" -> s"https://docs.couchbase.com/sdk-api/couchbase-java-client-${Dependencies.CouchbaseVersion}/",
"javadoc.io.pravega.base_url" -> s"http://pravega.io/docs/${Dependencies.PravegaVersionForDocs}/javadoc/clients/",
"javadoc.org.apache.kudu.base_url" -> s"https://kudu.apache.org/releases/${Dependencies.KuduVersion}/apidocs/",
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [IBM DB2 Event Store](external/db2-event-store.md)
* [InfluxDB](influxdb.md)
* [IronMQ](ironmq.md)
* [Jakarta Messaging](jakarta-jms/index.md)
* [JMS](jms/index.md)
* [MongoDB](mongodb.md)
* [MQTT](mqtt.md)
Expand Down
61 changes: 61 additions & 0 deletions docs/src/main/paradox/jakarta-jms/browse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Browse

## Browsing messages

The browse source streams the messages in a queue **without consuming them**.

Unlike the other sources, the browse source will complete after browsing all the messages currently on the queue.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #browse-source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #browse-source }

A JMS `selector` can be used to filter the messages. Otherwise it will browse the entire content of the queue.


**Notes:**

* Messages may be arriving and expiring while the scan is done.
* The JMS API does not require the content of an enumeration to be a static snapshot of queue content. Whether these changes are visible or not depends on the JMS provider.
* A message must not be returned by a QueueBrowser before its delivery time has been reached.



## Configure JMS browse

To connect to the JMS broker, first define an appropriate @javadoc[jakarta.jms.ConnectionFactory](jakarta.jms.ConnectionFactory). The Alpakka tests and all examples use Active MQ.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #connection-factory }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #connection-factory }


The created @javadoc[ConnectionFactory](jakarta.jms.ConnectionFactory) is then used for the creation of the different JMS sources.


The `JmsBrowseSettings` factories allow for passing the actor system to read from the default `alpakka.jakarta-jms.browse` section, or you may pass a `Config` instance which is resolved to a section of the same structure.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsSettingsSpec.scala) { #browse-settings }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsSettingsTest.java) { #consumer-settings }


The Alpakka Jakarta Messaging browse soruce is configured via default settings in the [HOCON](https://github.com/lightbend/config#using-hocon-the-json-superset) config file section `alpakka.jakarta-jms.browse` in your `application.conf`, and settings may be tweaked in the code using the `withXyz` methods. On the second tab the section from `reference.conf` shows the structure to use for configuring multiple set-ups.
ennru marked this conversation as resolved.
Show resolved Hide resolved

Table
: Setting | Description | Default Value |
------------------------|----------------------------------------------------------------------|---------------------|
connectionFactory | Factory to use for creating JMS connections | Must be set in code |
destination | The queue to browse | Must be set in code |
credentials | JMS broker credentials | Empty |
connectionRetrySettings | Retry characteristics if the connection failed to be established or is taking a long time. | See @ref[Connection Retries](producer.md#connection-retries)

reference.conf
: @@snip [snip](/jakarta-jms/src/main/resources/reference.conf) { #browse }

209 changes: 209 additions & 0 deletions docs/src/main/paradox/jakarta-jms/consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Consumer

The Alpakka Jakarta Messaging connector offers consuming JMS messages from topics or queues:

* Read `jakarta.jms.Message`s from an Akka Streams source
* Allow for client acknowledgement to the JMS broker
* Allow for JMS transactions
* Read raw JVM types from an Akka Streams Source

The JMS message model supports several types of message bodies in (see @javadoc[jakarta.jms.Message](jakarta.jms.Message)), which may be created directly from the Akka Stream elements, or in wrappers to access more advanced features.


## Receiving messages

@apidoc[jakartajms.*.JmsConsumer$] offers factory methods to consume JMS messages in a number of ways.

This examples shows how to listen to a JMS queue and emit @javadoc[jakarta.jms.Message](jakarta.jms.Message) elements into the stream.
ennru marked this conversation as resolved.
Show resolved Hide resolved

The materialized value @apidoc[jakartajms.*.JmsConsumerControl] is used to shut down the consumer (it is a @apidoc[KillSwitch]) and offers the possibility to inspect the connectivity state of the consumer.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #jms-source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #jms-source }


## Configure JMS consumers

To connect to the JMS broker, first define an appropriate @javadoc[jakarta.jms.ConnectionFactory](jakarta.jms.ConnectionFactory). The Alpakka tests and all examples use Active MQ.
ennru marked this conversation as resolved.
Show resolved Hide resolved

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #connection-factory }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #connection-factory }


The created @javadoc[ConnectionFactory](jakarta.jms.ConnectionFactory) is then used for the creation of the different JMS sources.

The @apidoc[jakartajms.JmsConsumerSettings$] factories allow for passing the actor system to read from the default `alpakka.jakarta-jms.consumer` section, or you may pass a `Config` instance which is resolved to a section of the same structure.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsSettingsSpec.scala) { #consumer-settings }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsSettingsTest.java) { #consumer-settings }

The Alpakka Jakarta Messaging consumer is configured via default settings in the [HOCON](https://github.com/lightbend/config#using-hocon-the-json-superset) config file section `alpakka.jakarta-jms.consumer` in your `application.conf`, and settings may be tweaked in the code using the `withXyz` methods. On the second tab the section from `reference.conf` shows the structure to use for configuring multiple set-ups.

Table
: Setting | Description | Default Value |
------------------------|----------------------------------------------------------------------|---------------------|
connectionFactory | Factory to use for creating JMS connections | Must be set in code |
destination | Destination (queue or topic) to send JMS messages to | Must be set in code |
credentials | JMS broker credentials | Empty |
connectionRetrySettings | Retry characteristics if the connection failed to be established or is taking a long time. | See @ref[Connection Retries](producer.md#connection-retries)
sessionCount | Number of parallel sessions to use for receiving JMS messages. | defaults to `1` |
bufferSize | Maximum number of messages to prefetch before applying backpressure. | 100 |
ackTimeout | For use with JMS transactions, only: maximum time given to a message to be committed or rolled back. | 1 second |
maxAckInterval | For use with AckSource, only: The max duration before the queued acks are sent to the broker | Empty |
maxPendingAcks | For use with AckSource, only: The amount of acks that get queued before being sent to the broker | 100 |
selector | JMS selector expression (see [below](#using-jms-selectors)) | Empty |
connectionStatusSubscriptionTimeout | 5 seconds | Time to wait for subscriber of connection status events before starting to discard them |

reference.conf
: @@snip [snip](/jakarta-jms/src/main/resources/reference.conf) { #consumer }


### Broker specific destinations

To reach out to special features of the JMS broker, destinations can be created as `CustomDestination` which takes a factory method for creating destinations.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #custom-destination }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #custom-destination }


## Using JMS client acknowledgement

Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below).

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala) { #source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsBufferedAckConnectorsTest.java) { #source }

The `sessionCount` parameter controls the number of JMS sessions to run in parallel.


**Notes:**

* Using multiple sessions increases throughput, especially if acknowledging message by message is desired.
* Messages may arrive out of order if `sessionCount` is larger than 1.
* Message-by-message acknowledgement can be achieved by setting `bufferSize` to 0, thus disabling buffering. The outstanding messages before backpressure will be the `sessionCount`.
* If buffering is enabled then it's possible for messages to remain in the buffer and never be acknowledged (or acknowledged after a long time) when no new elements arrive to reach the `maxPendingAcks` threshold. By setting `maxAckInterval` messages will be acknowledged after the defined interval or number of pending acks, whichever comes first.
* The default `AcknowledgeMode` is `ClientAcknowledge` but can be overridden to custom `AcknowledgeMode`s, even implementation-specific ones by setting the `AcknowledgeMode` in the `JmsConsumerSettings` when creating the stream.

@@@ warning

Using a regular `JmsConsumer` with `AcknowledgeMode.ClientAcknowledge` and using `message.acknowledge()` from the stream is not compliant with the JMS specification and can cause issues for some message brokers. `message.acknowledge()` in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe.

Use this `JmsConsumer.ackSource` as shown above instead.

@@@


## Using JMS transactions

JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala) { #source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsTxConnectorsTest.java) { #source }

The `sessionCount` parameter controls the number of JMS sessions to run in parallel.

The `ackTimeout` parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a `decider`.

**Notes:**

* Higher throughput is achieved by increasing the `sessionCount`.
* Messages will arrive out of order if `sessionCount` is larger than 1.
* Buffering is not supported in transaction mode. The `bufferSize` is ignored.
* The default `AcknowledgeMode` is `SessionTransacted` but can be overridden to custom `AcknowledgeMode`s, even implementation-specific ones by setting the `AcknowledgeMode` in the `JmsConsumerSettings` when creating the stream.



## Using JMS selectors

Create a @javadoc[jakarta.jms.Message](jakarta.jms.Message) source specifying a [JMS selector expression](https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html):
Verify that we are only receiving messages according to the selector:

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #source-with-selector }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #source-with-selector }


## Raw JVM type sources

| Stream element type | Alpakka source factory |
|-----------------------------------------------------------|--------------------------|
| `String` | [`JmsConsumer.textSource`](#text-sources) |
| @scala[`Array[Byte]`]@java[`byte[]`] | [`JmsConsumer.bytesSource`](#byte-array-sources) |
| @scala[`Map[String, AnyRef]`]@java[`Map<String, Object>`] | [`JmsConsumer.mapSource`](#map-messages-sources) |
| `Object` (`java.io.Serializable`) | [`JmsConsumer.objectSource`](#object-sources) |

### Text sources

The `textSource` emits the received message body as String:

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #text-source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #text-source }


### Byte array sources

The `bytesSource` emits the received message body as byte array:

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #bytearray-source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #bytearray-source }


### Map sources

The `mapSource` emits the received message body as @scala[Map[String, Object]]@java[Map<String, Object>]:

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #map-source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #map-source }


### Object sources

The `objectSource` emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See [ActiveMQ Security](https://activemq.apache.org/objectmessage.html) for more information on this.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #object-source }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #object-source }


## Request / Reply

The request / reply pattern can be implemented by streaming a @apidoc[jakartajms.*.JmsConsumer$]
to a @apidoc[jakartajms.*.JmsProducer$],
with a stage in between that extracts the `ReplyTo` and `CorrelationID` from the original message and adds them to the response.

Scala
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #request-reply }

Java
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #request-reply }
41 changes: 41 additions & 0 deletions docs/src/main/paradox/jakarta-jms/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Jakarta Messaging (JMS)

@@@ note { title="Jakarta Messaging (JMS)" }

The Jakarta Messaging API (formerly Java Message Service or JMS API) is a Java application programming interface (API) for message-oriented middleware. It provides generic messaging models, able to handle the producer–consumer problem, that can be used to facilitate the sending and receiving of messages between software systems. Jakarta Messaging is a part of [Jakarta EE]((https://jakarta.ee)) and was originally defined by a specification developed at Sun Microsystems before being guided by the Java Community Process.

-- [Wikipedia](https://en.wikipedia.org/wiki/Jakarta_Messaging)

@@@

The Alpakka Jakarta Messaging connector provides Akka Stream sources and sinks to connect to Jakarta Messaging providers.

@@project-info{ projectId="jakarta-jms" }

## Artifacts

The Akka dependencies are available from Akka's library repository. To access them there, you need to configure the URL for this repository.

@@repository [sbt,Maven,Gradle] {
id="akka-repository"
name="Akka library repository"
url="https://repo.akka.io/maven"
}

Additionally, add the dependencies as below.

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-jakarta-jms_$scala.binary.version$
version=$project.version$
}

@@toc { depth=2 }

@@@ index

* [p](producer.md)
* [c](consumer.md)
* [c](browse.md)

@@@
Loading
Loading