From c9a9c390926af4e7f8bc2c973a720fe68f04bfb5 Mon Sep 17 00:00:00 2001 From: Krzysztof Ciesielski Date: Wed, 12 Jun 2024 15:07:45 +0200 Subject: [PATCH] Ox integration: SSE (#2208) --- docs/backends/synchronous.md | 30 ++++++++++++++++ .../impl/ox/sse/OxServerSentEvents.scala | 22 ++++++++++++ .../sttp/client4/impl/ox/ws/OxSseTest.scala | 35 +++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala create mode 100644 effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala diff --git a/docs/backends/synchronous.md b/docs/backends/synchronous.md index 35f6f3a309..16e94dad64 100644 --- a/docs/backends/synchronous.md +++ b/docs/backends/synchronous.md @@ -92,3 +92,33 @@ Synchronous backends don't support non-blocking [streaming](../requests/streamin ## Websockets Both HttpClient and OkHttp backends support regular [websockets](../websockets.md). + +## Server-sent events + +[Ox](https://ox.softwaremill.com) is a Scala 3 toolkit that allows you to handle concurrency and resiliency in direct-style, leveraging Java 21 virtual threads. If you're using Ox with `sttp`, you can handle SSE as a `Source[ServerSentEvent]`: + +``` +// sbt dependency +"com.softwaremill.sttp.client4" %% "ox" % "@VERSION@", +``` + +```scala +import ox.* +import ox.channels.Source +import sttp.client4.* +import sttp.client4.impl.ox.sse.OxServerSentEvents +import sttp.model.sse.ServerSentEvent +import java.io.InputStream + +def handleSse(is: InputStream)(using IO): Unit = + supervised { + OxServerSentEvents.parse(is).foreach(event => println(s"Received event: $event")) + } + +val backend = DefaultSyncBackend() +IO.unsafe: + basicRequest + .get(uri"https://postman-echo.com/server-events/3") + .response(asInputStreamAlways(handleSse)) + .send(backend) +``` diff --git a/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala b/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala new file mode 100644 index 0000000000..afd4db2297 --- /dev/null +++ b/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala @@ -0,0 +1,22 @@ +package sttp.client4.impl.ox.sse + +import ox.* +import ox.channels.Source +import sttp.model.sse.ServerSentEvent + +import java.io.InputStream + +object OxServerSentEvents: + def parse(is: InputStream)(using Ox, IO): Source[ServerSentEvent] = + Source + .fromInputStream(is) + .linesUtf8 + .mapStatefulConcat(() => List.empty[String])( + (lines, str) => if str.isEmpty then (Nil, List(lines)) else (lines :+ str, Nil), + onComplete = { lines => + if lines.nonEmpty then Some(lines) + else None + } + ) + .filter(_.nonEmpty) + .map(ServerSentEvent.parse) diff --git a/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala b/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala new file mode 100644 index 0000000000..07df17f912 --- /dev/null +++ b/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala @@ -0,0 +1,35 @@ +package sttp.client4.impl.ox.sse + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* +import ox.IO.globalForTesting.given +import sttp.client4.* +import sttp.client4.testing.HttpTest.* +import sttp.model.sse.ServerSentEvent + +class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll: + + lazy val backend: WebSocketSyncBackend = DefaultSyncBackend() + + behavior of "OxServerSentEvents" + + it should "parse SSEs" in supervised { + val sseData = "text1 in line1\ntext2 in line2" + val expectedEvent = ServerSentEvent(data = Some(sseData), eventType = Some("test-event"), retry = Some(42000)) + val expectedEvents = + Seq(expectedEvent.copy(id = Some("1")), expectedEvent.copy(id = Some("2")), expectedEvent.copy(id = Some("3"))) + basicRequest + .post(uri"$endpoint/sse/echo3") + .body(sseData) + .response(asInputStreamAlways { is => + OxServerSentEvents.parse(is).take(3).toList shouldBe(expectedEvents) + () + }) + .send(backend) + } + + override protected def afterAll(): Unit = + backend.close() + super.afterAll()