Skip to content

Commit

Permalink
Ox integration: SSE (#2208)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski authored Jun 12, 2024
1 parent c1b3a2a commit c9a9c39
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
30 changes: 30 additions & 0 deletions docs/backends/synchronous.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,33 @@ Synchronous backends don't support non-blocking [streaming](../requests/streamin
## Websockets

Both HttpClient and OkHttp backends support regular [websockets](../websockets.md).

## Server-sent events

[Ox](https://ox.softwaremill.com) is a Scala 3 toolkit that allows you to handle concurrency and resiliency in direct-style, leveraging Java 21 virtual threads. If you're using Ox with `sttp`, you can handle SSE as a `Source[ServerSentEvent]`:

```
// sbt dependency
"com.softwaremill.sttp.client4" %% "ox" % "@VERSION@",
```

```scala
import ox.*
import ox.channels.Source
import sttp.client4.*
import sttp.client4.impl.ox.sse.OxServerSentEvents
import sttp.model.sse.ServerSentEvent
import java.io.InputStream

def handleSse(is: InputStream)(using IO): Unit =
supervised {
OxServerSentEvents.parse(is).foreach(event => println(s"Received event: $event"))
}

val backend = DefaultSyncBackend()
IO.unsafe:
basicRequest
.get(uri"https://postman-echo.com/server-events/3")
.response(asInputStreamAlways(handleSse))
.send(backend)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sttp.client4.impl.ox.sse

import ox.*
import ox.channels.Source
import sttp.model.sse.ServerSentEvent

import java.io.InputStream

object OxServerSentEvents:
def parse(is: InputStream)(using Ox, IO): Source[ServerSentEvent] =
Source
.fromInputStream(is)
.linesUtf8
.mapStatefulConcat(() => List.empty[String])(
(lines, str) => if str.isEmpty then (Nil, List(lines)) else (lines :+ str, Nil),
onComplete = { lines =>
if lines.nonEmpty then Some(lines)
else None
}
)
.filter(_.nonEmpty)
.map(ServerSentEvent.parse)
35 changes: 35 additions & 0 deletions effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package sttp.client4.impl.ox.sse

import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import ox.IO.globalForTesting.given
import sttp.client4.*
import sttp.client4.testing.HttpTest.*
import sttp.model.sse.ServerSentEvent

class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll:

lazy val backend: WebSocketSyncBackend = DefaultSyncBackend()

behavior of "OxServerSentEvents"

it should "parse SSEs" in supervised {
val sseData = "text1 in line1\ntext2 in line2"
val expectedEvent = ServerSentEvent(data = Some(sseData), eventType = Some("test-event"), retry = Some(42000))
val expectedEvents =
Seq(expectedEvent.copy(id = Some("1")), expectedEvent.copy(id = Some("2")), expectedEvent.copy(id = Some("3")))
basicRequest
.post(uri"$endpoint/sse/echo3")
.body(sseData)
.response(asInputStreamAlways { is =>
OxServerSentEvents.parse(is).take(3).toList shouldBe(expectedEvents)
()
})
.send(backend)
}

override protected def afterAll(): Unit =
backend.close()
super.afterAll()

0 comments on commit c9a9c39

Please sign in to comment.