Skip to content

Commit

Permalink
RS compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Nov 21, 2024
1 parent 49d6802 commit b002262
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ lazy val documentation: Project = (project in file("generated-doc")) // importan
.dependsOn(
core,
kafka,
mdcLogback
mdcLogback,
flowReactiveStreams
)
3 changes: 2 additions & 1 deletion core/src/main/scala/ox/flow/FlowReactiveOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ trait FlowReactiveOps[+T]:

/** Converts this [[Flow]] into a [[Publisher]]. The flow is run every time the publisher is subscribed to.
*
* Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process.
* Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should
* remain active as long as the publisher is used.
*
* Elements emitted by the flow are buffered, using a buffer of capacity given by the [[BufferCapacity]] in scope.
*
Expand Down
26 changes: 26 additions & 0 deletions doc/streaming/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,30 @@ import ox.flow.Flow
Flow.fromValues(1, 2, 3)
.tap(n => println(s"Received: $n"))
.runToList()
```

## Reactive streams interoperability

A `Flow` can be converted to a `java.util.concurrent.Flow.Publisher` using the `.toPublisher` method.

This needs to be run within an `Ox` concurrency scope, as upon subscribing, a fork is created to run the publishing
process. Hence, the scope should remain active as long as the publisher is used.

Internally, elements emitted by the flow are buffered, using a buffer of capacity given by the `BufferCapacity` in
scope.

To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the following dependency and import, to
bring the `toReactiveStreamsPublisher` method into scope:

```scala mdoc:compile-only
// sbt dependency: "com.softwaremill.ox" %% "flow-reactive-streams" % "@VERSION@"

import ox.supervised
import ox.flow.Flow
import ox.flow.reactive.*

val myFlow: Flow[Int] = ???
supervised:
myFlow.toReactiveStreamsPublisher: org.reactivestreams.Publisher[Int]
// use the publisher
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ox.flow.reactive

import ox.flow.Flow
import org.reactivestreams.Publisher
import ox.Ox
import ox.channels.BufferCapacity
import org.reactivestreams.FlowAdapters

extension [A](flow: Flow[A])
/** This variant returns an implementation of `org.reactivestreams.Publisher`, as opposed to `java.util.concurrent.Flow.Publisher` which
* is supported in the core module.
*
* @see
* [[Flow.toPublisher]]
*/
def toReactiveStreamsPublisher(using Ox, BufferCapacity): Publisher[A] =
FlowAdapters.toPublisher(flow.toPublisher)

0 comments on commit b002262

Please sign in to comment.