From f96a5ca68b7715195aedb48877f601dbc2a83711 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Piaggio?= Date: Mon, 22 Jul 2024 12:53:38 -0300 Subject: [PATCH] unify connect+initialize --- build.sbt | 7 +- .../scala/clue/PersistentClientStatus.scala | 2 - core/src/main/scala/clue/clients.scala | 10 +- core/src/main/scala/clue/package.scala | 3 + .../scala/clue/websocket/ApolloClient.scala | 512 ++++++------------ .../main/scala/clue/websocket/Emitter.scala | 16 + .../src/main/scala/clue/websocket/State.scala | 34 ++ .../main/resources/simplelogger.properties | 1 + .../main/scala/clue/http4sjdkDemo/Demo.scala | 14 +- 9 files changed, 240 insertions(+), 359 deletions(-) create mode 100644 core/src/main/scala/clue/websocket/Emitter.scala create mode 100644 core/src/main/scala/clue/websocket/State.scala create mode 100644 http4s-jdk-demo/src/main/resources/simplelogger.properties diff --git a/build.sbt b/build.sbt index 23c6e05c..a247e0b2 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ lazy val scala3Version = "3.4.2" lazy val rulesCrossVersions = Seq(V.scala213) lazy val allVersions = rulesCrossVersions :+ scala3Version -ThisBuild / tlBaseVersion := "0.39" +ThisBuild / tlBaseVersion := "0.40" ThisBuild / tlCiReleaseBranches := Seq("master") ThisBuild / tlJdkRelease := Some(8) ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_)) @@ -92,8 +92,9 @@ lazy val http4sJDKDemo = project .in(file("http4s-jdk-demo")) .enablePlugins(NoPublishPlugin) .settings( - moduleName := "clue-http4s-jdk-client-demo", - tlJdkRelease := Some(11), + moduleName := "clue-http4s-jdk-client-demo", + tlJdkRelease := Some(11), + Compile / run / fork := true, libraryDependencies ++= Seq( "org.typelevel" %% "log4cats-slf4j" % Settings.LibraryVersions.log4Cats, "org.slf4j" % "slf4j-simple" % "2.0.13" diff --git a/core/src/main/scala/clue/PersistentClientStatus.scala b/core/src/main/scala/clue/PersistentClientStatus.scala index ad8792d0..5ec88c2b 100644 --- a/core/src/main/scala/clue/PersistentClientStatus.scala +++ b/core/src/main/scala/clue/PersistentClientStatus.scala @@ -10,8 +10,6 @@ sealed trait PersistentClientStatus object PersistentClientStatus { case object Connecting extends PersistentClientStatus case object Connected extends PersistentClientStatus - case object Initializing extends PersistentClientStatus - case object Initialized extends PersistentClientStatus case object Disconnected extends PersistentClientStatus implicit val eqStreamingClientStatus: Eq[PersistentClientStatus] = Eq.fromUniversalEquals diff --git a/core/src/main/scala/clue/clients.scala b/core/src/main/scala/clue/clients.scala index 9d128da6..1d65ef75 100644 --- a/core/src/main/scala/clue/clients.scala +++ b/core/src/main/scala/clue/clients.scala @@ -118,21 +118,15 @@ trait StreamingClient[F[_], S] extends FetchClientWithPars[F, Unit, S] { * A client that keeps a connection open and initializable protocol with the server. */ trait PersistentClient[F[_], CP, CE] { - // protected val backend: PersistentBackend[F, CP, CE] - // protected val reconnectionStrategy: ReconnectionStrategy[CE] - def status: F[PersistentClientStatus] def statusStream: fs2.Stream[F, PersistentClientStatus] - def connect(): F[Unit] // Initialization may repeat upon reconnection, that's why the payload is effectful since it may change over time (eg: auth tokens). - def initialize(payload: F[Map[String, Json]]): F[Unit] + def connect(payload: F[Map[String, Json]]): F[Unit] + def connect(): F[Unit] - def terminate(): F[Unit] def disconnect(closeParameters: CP): F[Unit] def disconnect(): F[Unit] - - def reestablish(): F[Unit] } /** diff --git a/core/src/main/scala/clue/package.scala b/core/src/main/scala/clue/package.scala index 81ade3c8..443c19bc 100644 --- a/core/src/main/scala/clue/package.scala +++ b/core/src/main/scala/clue/package.scala @@ -52,6 +52,9 @@ package object clue { def logAndRaiseF_[F[_], A](implicit F: MonadError[F, Throwable], logger: Logger[F]): F[A] = logger.error(t)("") >> F.raiseError[A](t) + def raiseF[F[_]](implicit F: MonadError[F, Throwable]): F[Unit] = + F.raiseError(t) + def logF[F[_]]( msg: String )(implicit logger: Logger[F]): F[Unit] = diff --git a/core/src/main/scala/clue/websocket/ApolloClient.scala b/core/src/main/scala/clue/websocket/ApolloClient.scala index 77c6f647..b946da30 100644 --- a/core/src/main/scala/clue/websocket/ApolloClient.scala +++ b/core/src/main/scala/clue/websocket/ApolloClient.scala @@ -11,6 +11,8 @@ import cats.effect.std.Queue import cats.effect.std.UUIDGen import cats.syntax.all.* import clue.* +import clue.StringOps +import clue.ThrowableOps import clue.model.GraphQLErrors import clue.model.GraphQLRequest import clue.model.GraphQLResponse @@ -23,54 +25,6 @@ import io.circe.parser.* import org.typelevel.log4cats.Logger import java.util.UUID -import scala.concurrent.duration.FiniteDuration - -protected[clue] trait Emitter[F[_]] { - val request: GraphQLRequest[JsonObject] - - def emitData(response: GraphQLResponse[Json]): F[Unit] - def emitErrors(errors: GraphQLErrors): F[Unit] - val halt: F[Unit] -} - -// Client internal state for the FSM. -// We keep a connectionId throughout all states to ensure that callback events (onClose, onMessage) -// correpond to the current connection iteration. This is important in case of reconnections. -protected sealed abstract class State[F[_]](val status: PersistentClientStatus) { - val connectionId: ConnectionId -} - -protected object State { - final case class Disconnected[F[_]](connectionId: ConnectionId) - extends State[F](PersistentClientStatus.Disconnected) - final case class Connecting[F[_]](connectionId: ConnectionId, latch: Latch[F]) - extends State[F](PersistentClientStatus.Connecting) - final case class Connected[F[_]]( - connectionId: ConnectionId, - connection: WebSocketConnection[F] - ) extends State[F](PersistentClientStatus.Connected) - final case class Initializing[F[_]]( - connectionId: ConnectionId, - connection: WebSocketConnection[F], - subscriptions: Map[String, Emitter[F]], - initPayload: F[Map[String, Json]], - latch: Latch[F] - ) extends State[F](PersistentClientStatus.Initializing) - final case class Initialized[F[_]]( - connectionId: ConnectionId, - connection: WebSocketConnection[F], - subscriptions: Map[String, Emitter[F]], - initPayload: F[Map[String, Json]] - ) extends State[F](PersistentClientStatus.Initialized) - // Reestablishing = We are in the process of reconnecting + reinitializing after a low level error/close, but we haven't connected yet. - final case class Reestablishing[F[_]]( - connectionId: ConnectionId, - subscriptions: Map[String, Emitter[F]], - initPayload: F[Map[String, Json]], - connectLatch: Latch[F], - initLatch: Latch[F] - ) extends State[F](PersistentClientStatus.Connecting) -} class ApolloClient[F[_], P, S]( connectionParams: P, @@ -106,66 +60,24 @@ class ApolloClient[F[_], P, S]( override def statusStream: fs2.Stream[F, PersistentClientStatus] = connectionStatus.discrete - override def connect(): F[Unit] = { + override def connect(): F[Unit] = connect(Map.empty.pure[F]) + + override def connect(payload: F[Map[String, Json]]): F[Unit] = { val warn = "connect() called while already connected or attempting to connect.".warnF Latch[F].flatMap { newLatch => stateModify { - case Disconnected(connectionId) => - Connecting(connectionId, newLatch) -> doConnect(connectionId, newLatch) - case s @ Connecting(_, latch) => + case Disconnected(connectionId) => + Connecting(connectionId, none, payload, Map.empty, newLatch) -> + doConnect(connectionId) + case s @ Connecting(_, _, _, _, latch) => s -> (warn >> latch.resolve) - case s @ Reestablishing(_, _, _, connectLatch, _) => - s -> (warn >> connectLatch.resolve) - case state => state -> warn - } - } - } - - override def initialize(payload: F[Map[String, Json]] = F.pure(Map.empty)): F[Unit] = { - val error = InvalidInvocationException("initialize() called while disconnected.").logAndRaiseF - val warn = - "initialize() called while already attempting to initialize (or reestablishing).".warnF - - Latch[F].flatMap { newLatch => - stateModify { - case s @ (Disconnected(_) | Connecting(_, _)) => s -> error - case Connected(connectionId, connection) => - Initializing(connectionId, connection, Map.empty, payload, newLatch) -> - doInitialize(payload, connection, newLatch) - case s @ Initializing(_, _, _, _, latch) => s -> (warn >> latch.resolve) - case s @ Reestablishing(_, _, _, _, initLatch) => s -> (warn >> initLatch.resolve) - case Initialized(connectionId, connection, subscriptions, _) => - Initializing(connectionId, connection, subscriptions, payload, newLatch) -> - (stopSubscriptions(connection, subscriptions) >> - doInitialize(payload, connection, newLatch)).uncancelable + case s => + s -> warn } } } - override def terminate(): F[Unit] = { - val error = InvalidInvocationException("terminate() called while uninitialized.").logAndRaiseF - val warn = "terminate() called while initializing.".warnF - - stateModify { - case Initialized(connectionId, connection, subscriptions, _) => - Connected(connectionId, connection) -> - (for { - t <- gracefulTerminate(connection, subscriptions).start - h <- haltSubscriptions(subscriptions).start - _ <- t.join - _ <- h.join - } yield ()) - case s @ Initializing(_, _, _, _, latch) => - s -> (warn >> latch.resolve >> terminate()) - case s @ Reestablishing(_, _, _, _, initLatch) => - s -> (warn >> initLatch.resolve >> terminate()) - case s => - s -> error - } - // .uncancelable // TODO We have waiting effects, we need to handle interruptions. - } - final def disconnect(closeParameters: CloseParams): F[Unit] = disconnectInternal( closeParameters.some ) @@ -180,48 +92,25 @@ class ApolloClient[F[_], P, S]( // We *could* wait for onClose to be invoked before completing, but is there a point to that? stateModify { - case Connecting(connectionId, latch) => + case Connecting(connectionId, connection, _, _, latch) => // We need a wait for the connection to establish and then disconnect it, without blocking the client. Disconnected(connectionId.next) -> - latch.error(interruptedError) // >> TODO wait in background to complete and close - case Connected(connectionId, connection) => - Disconnected(connectionId.next) -> connection.closeInternal(closeParameters) - case Initializing(connectionId, connection, _, _, latch) => - Disconnected(connectionId.next) -> - (latch.error(interruptedError) >> connection.closeInternal(closeParameters)) - case Initialized(connectionId, connection, _, _) => - Disconnected(connectionId.next) -> connection.closeInternal(closeParameters) - case Reestablishing(connectionId, _, _, connectLatch, initLatch) => + (latch.error(interruptedError) >> + connection + .map(_.closeInternal(closeParameters)) + .getOrElse(F.unit)) // >> TODO wait in background to complete and close + case Connected(connectionId, connection, _, subscriptions) => Disconnected(connectionId.next) -> - (connectLatch.error(interruptedError) >> - initLatch.error(interruptedError)) // >> TODO wait in background to complete and close - case s => s -> error + ( + (gracefulTerminate(connection, subscriptions), + haltSubscriptions(subscriptions) + ).parTupled >> + connection.closeInternal(closeParameters) + ) + case s => s -> error }.uncancelable } - override def reestablish(): F[Unit] = - Latch[F].flatMap { newConnectLatch => - Latch[F].flatMap { newInitLatch => - stateModify { - case s @ Reestablishing(_, _, _, _, initLatch) => - s -> (s"reestablish() called while already reestablishing.".errorF >> initLatch.resolve) - case Initialized(connectionId, connection, subscriptions, initPayload) => - Reestablishing( - connectionId.next, - subscriptions, - initPayload, - newConnectLatch, - newInitLatch - ) -> - ((gracefulTerminate(connection, subscriptions) >> connection.close()).start >> - doConnect(connectionId.next, newConnectLatch) >> - newInitLatch.resolve) - case s @ _ => - s -> s"reestablish() called while disconnected or uninitialized.".errorF - } - } - } - // override protected def subscribeInternal[D: Decoder, R]( subscription: String, @@ -258,25 +147,24 @@ class ApolloClient[F[_], P, S]( ServerMessageDecodingException(e).logAndRaiseF case Right(StreamingMessage.FromServer.ConnectionAck) => stateModify { - case Initializing(stateConnectionId, connection, subscriptions, initPayload, latch) + case s @ Connecting(stateConnectionId, _, _, _, latch) if connectionId === stateConnectionId => - Initialized(connectionId, connection, subscriptions, initPayload) -> - (startSubscriptions(connection, subscriptions) >> latch.release) + s -> latch.release case s => s -> s"Unexpected connection_ack received from server.".warnF } case Right(StreamingMessage.FromServer.ConnectionError(payload)) => stateModify { - case Initializing(stateConnectionId, connection, _, _, latch) + case s @ Connecting(stateConnectionId, _, _, _, latch) if connectionId === stateConnectionId => // We don't disconnect here. According to spec: - // "It server (sic) also respond with this message in case of a parsing errors of the message (which does not disconnect the client, just ignore the message)." - Connected(connectionId, connection) -> + // "It server (sic) also respond with this message in case of a parsing errors of the message (which does not disconnect the client, just ignore the message)." + s -> latch.error(RemoteInitializationException(payload)).void case s => s -> s"Unexpected connection_error received from server.".warnF } case Right(msg @ StreamingMessage.FromServer.Data(subscriptionId, response)) => state.get.flatMap { - case Initialized(stateConnectionId, _, subscriptions, _) + case Connected(stateConnectionId, _, _, subscriptions) if connectionId === stateConnectionId => subscriptions.get(subscriptionId) match { case None => @@ -293,7 +181,7 @@ class ApolloClient[F[_], P, S]( // TODO Contemplate different states. case Right(msg @ StreamingMessage.FromServer.Error(subscriptionId, payload)) => state.get.flatMap { - case Initialized(stateConnectionId, _, subscriptions, _) + case Connected(stateConnectionId, _, _, subscriptions) if connectionId === stateConnectionId => subscriptions.get(subscriptionId) match { case None => @@ -310,24 +198,21 @@ class ApolloClient[F[_], P, S]( } case Right(StreamingMessage.FromServer.Complete(subscriptionId)) => state.get.flatMap { - case Initialized(stateConnectionId, _, subscriptions, _) + case Connected(stateConnectionId, _, _, subscriptions) if connectionId === stateConnectionId => subscriptions.get(subscriptionId) match { case None => F.unit case Some(subscription) => subscription.halt } // Next 3 cases are expected. Server will send complete packages for subscriptions shut down when reestablishing/reinitializing. - case Reestablishing(stateConnectionId, _, _, _, _) - if connectionId =!= stateConnectionId => - F.unit - case Initializing(_, _, _, _, _) => + case Connecting(stateConnectionId, _, _, _, _) if connectionId =!= stateConnectionId => F.unit - case Initialized(stateConnectionId, _, _, _) if connectionId =!= stateConnectionId => + case Connected(stateConnectionId, _, _, _) if connectionId =!= stateConnectionId => F.unit - case s @ Disconnected(_) => + case s @ Disconnected(_) => s"Complete RECEIVED for subscription [$subscriptionId] on Disconnected state.".debugF >> s" \\-- State Is: [$s]".traceF - case s @ _ => + case s @ _ => s"UNEXPECTED Complete RECEIVED for subscription [$subscriptionId].".warnF >> s" \\-- State Is: [$s]".traceF } @@ -343,78 +228,42 @@ class ApolloClient[F[_], P, S]( reconnectionStrategy(0, event.asRight) match { case None => stateModify { - case s @ Disconnected(_) => + case s @ Disconnected(_) => s -> s"onClose() called while disconnected.".debugF - case Connecting(stateConnectionId, latch) if connectionId === stateConnectionId => - Disconnected(connectionId.next) -> latch.error(error) - case Reestablishing(stateConnectionId, _, _, connectLatch, initLatch) - if connectionId === stateConnectionId => - Disconnected(connectionId.next) -> - (connectLatch.error(error) >> initLatch.error(error)) - case Initializing(stateConnectionId, _, _, _, latch) + case Connecting(stateConnectionId, _, _, _, latch) if connectionId === stateConnectionId => Disconnected(connectionId.next) -> latch.error(error) - case Connected(stateConnectionId, _) if connectionId === stateConnectionId => - Disconnected(connectionId.next) -> F.unit - case Initialized(stateConnectionId, _, _, _) if connectionId === stateConnectionId => + case Connected(stateConnectionId, _, _, _) if connectionId === stateConnectionId => Disconnected(connectionId.next) -> F.unit - case s @ _ => + case s @ _ => s -> debug } case Some(wait) => - Latch[F].flatMap { newConnectLatch => - Latch[F].flatMap { newInitLatch => - def waitAndConnect(nextConnectionId: ConnectionId, latch: Latch[F]): F[Unit] = - s"Connection closed with event [$event]. Attempting to reconnect.".warnF >> - s"Waiting [$wait] before reconnect...".debugF >> - timer.sleep(wait) >> - doConnect(nextConnectionId, latch) - - stateModify { - case s @ Disconnected(stateConnectionId) if connectionId === stateConnectionId => - s -> s"Unexpected onClose() called while disconnected. Not applying reconnectStrategy.".warnF - case Connecting(stateConnectionId, connectLatch) - if connectionId === stateConnectionId => - Connecting(connectionId.next, connectLatch) -> - waitAndConnect(connectionId.next, connectLatch) - case Connected(stateConnectionId, _) if connectionId === stateConnectionId => - Connecting(connectionId.next, newConnectLatch) -> - waitAndConnect(connectionId.next, newConnectLatch) - case Initializing(stateConnectionId, _, subscriptions, initPayload, latch) - if connectionId === stateConnectionId => - Reestablishing( - connectionId.next, - subscriptions, - initPayload, - newConnectLatch, - latch - ) -> waitAndConnect(connectionId.next, newConnectLatch) - case Initialized(stateConnectionId, _, subscriptions, initPayload) - if connectionId === stateConnectionId => - Reestablishing( - connectionId.next, - subscriptions, - initPayload, - newConnectLatch, - newInitLatch - ) -> waitAndConnect(connectionId.next, newConnectLatch) - case Reestablishing( - stateConnectionId, - subscriptions, - initPayload, - connectLatch, - initLatch - ) if connectionId === stateConnectionId => - Reestablishing( - connectionId.next, - subscriptions, - initPayload, - connectLatch, - initLatch - ) -> waitAndConnect(connectionId.next, connectLatch) - case s @ _ => - s -> debug - } + Latch[F].flatMap { newLatch => + def waitAndConnect(nextConnectionId: ConnectionId): F[Unit] = + s"Connection closed with event [$event]. Attempting to reconnect.".warnF >> + s"Waiting [$wait] before reconnect...".debugF >> + timer.sleep(wait) >> + doConnect(nextConnectionId, attempt = 1) + + stateModify { + case s @ Disconnected(stateConnectionId) if connectionId === stateConnectionId => + s -> s"Unexpected onClose() called while disconnected. Not applying reconnectStrategy.".warnF + case Connecting(stateConnectionId, _, initPayload, subscriptions, connectLatch) + if connectionId === stateConnectionId => + Connecting(connectionId.next, none, initPayload, subscriptions, connectLatch) -> + waitAndConnect(connectionId.next) + case Connected(stateConnectionId, _, initPayload, subscriptions) + if connectionId === stateConnectionId => + Connecting( + connectionId.next, + none, + initPayload, + subscriptions, + newLatch + ) -> waitAndConnect(connectionId.next) + case s @ _ => + s -> debug } } } @@ -422,101 +271,98 @@ class ApolloClient[F[_], P, S]( // // - private def doConnect( - connectionId: ConnectionId, - latch: Latch[F], - attempt: Int = 1 - ): F[Unit] = - backend - .connect(connectionParams, this, connectionId) - .attempt - .flatMap { connection => - def retry(t: Throwable, wait: FiniteDuration, nextConnectionId: ConnectionId): F[Unit] = - t.warnF(s"Error in connect() after attempt #[$attempt]. Retrying.") >> + private def handleRetry( + t: Throwable, + oldConnection: Option[WebSocketConnection[F]], + nextConnectionId: ConnectionId, + payload: F[Map[String, Json]], + subscriptions: Map[String, Emitter[F]], + newLatch: Latch[F], + attempt: Int + ): (State[F], F[Unit]) = { + val disconnect: F[Unit] = oldConnection.map(_.closeInternal(none).start.void).getOrElse(F.unit) + + reconnectionStrategy(attempt, t.asLeft) match { + case None => + Disconnected(nextConnectionId) -> + (disconnect >> t.logAndRaiseF) + case Some(wait) => + Connecting(nextConnectionId, none, payload, subscriptions, newLatch) -> + (t.warnF(s"Error in connect() after attempt #[$attempt]. Retrying.") >> s"Waiting [$wait] before reconnect...".debugF >> + disconnect >> timer.sleep(wait) >> - doConnect(nextConnectionId, latch, attempt + 1) + doConnect(nextConnectionId, attempt + 1)) + } + } - stateModify { - case Connecting(connectionId, latch) => - connection match { - case Left(t) => - reconnectionStrategy(attempt, t.asLeft) match { - case None => - Disconnected(connectionId.next) -> - (latch.error(t) >> F.raiseError(t)).void - case Some(wait) => - Connecting(connectionId.next, latch) -> retry(t, wait, connectionId.next) - } - case Right(c) => Connected(connectionId, c) -> latch.release - } - case Reestablishing(connectionId, subscriptions, initPayload, connectLatch, initLatch) => - connection match { - case Left(t) => - reconnectionStrategy(attempt, t.asLeft) match { - case None => - Disconnected(connectionId.next) -> - (latch.error(t) >> initLatch.error(t) >> F.raiseError(t)).void - case Some(wait) => - Reestablishing( - connectionId.next, - subscriptions, - initPayload, - connectLatch, - initLatch - ) -> retry(t, wait, connectionId.next) - } - case Right(c) => - Initializing(connectionId, c, subscriptions, initPayload, initLatch) -> - (latch.release >> doInitialize(initPayload, c, initLatch)) - } - case s => - s -> ( - s"Unexpected state in connect(). State Is: [$s]".traceF >> - (latch.complete(connection.void.some) >> - InvalidInvocationException( - s"Unexpected state in connect(). Unblocking clients, but state may be inconsistent." - ).logAndRaiseF) - ) - } - } - .guaranteeCase { - case Outcome.Succeeded(_) | Outcome.Errored(_) => F.unit - case Outcome.Canceled() => // Cleanup. + private def doConnect(connectionId: ConnectionId, attempt: Int = 1): F[Unit] = + s"Connecting. Attempt: [$attempt].".traceF >> + backend + .connect(connectionParams, this, connectionId) + .attempt + .flatMap { connectionAttempt => stateModify { - case s @ Connected(_, _) => s -> latch.release - case Connecting(connectionId, _) => - // TODO Cleanup the web socket. We should call .close() on it once it's connected. But we have to keep track of it. - Disconnected(connectionId.next) -> latch.cancel - case Reestablishing(connectionId, _, _, _, initLatch) => - // TODO Cleanup the web socket. We should call .close() on it once it's connected. But we have to keep track of it. - Disconnected(connectionId.next) -> - (latch.cancel >> initLatch.cancel) - case s => - s -> UnexpectedInternalStateException("cancelling connect()", s).logAndRaiseF + case Connecting(connectionId, None, payload, subscriptions, latch) => + connectionAttempt match { + case Left(t) => + handleRetry(t, none, connectionId.next, payload, subscriptions, latch, attempt) + case Right(connection) => + Connecting(connectionId, connection.some, payload, subscriptions, latch) -> + doInitialize(connection, payload, latch, attempt) + } + case s => + s -> (s"Unexpected state in connect().".errorF >> s"State Is: [$s]".traceF >> + InvalidInvocationException( + s"Unexpected state in connect(). Unblocking clients, but state may be inconsistent." + ).raiseF) } - } + } + .guaranteeCase { + case Outcome.Succeeded(_) | Outcome.Errored(_) => F.unit + case Outcome.Canceled() => disconnect().start.void // Cleanup + } private def doInitialize( - payload: F[Map[String, Json]], connection: WebSocketConnection[F], - latch: Latch[F] - ): F[Unit] = (for { - _ <- payload >>= (p => connection.send(StreamingMessage.FromClient.ConnectionInit(p))) - _ <- latch.resolve - } yield ()).guaranteeCase { - case Outcome.Succeeded(_) | Outcome.Errored(_) => F.unit - case Outcome.Canceled() => // Cleanup. - stateModify { - case s @ Initializing(_, _, _, _, _) => - s -> - (disconnect().start >> latch.cancel) - case s => - s -> - (disconnect().start >> - UnexpectedInternalStateException("cancelling initialize()", s).logAndRaiseF) + payload: F[Map[String, Json]], + latch: Latch[F], + attempt: Int + ): F[Unit] = + (for { + p <- payload + _ <- s"Initializing. Attempt: [$attempt]. Payload: [$p].".traceF + _ <- connection.send(StreamingMessage.FromClient.ConnectionInit(p)) + result <- latch.resolve.attempt // Sync up with server response. + newLatch <- Latch[F] + _ <- stateModify { + case Connecting(connectionId, Some(connection), payload, subscriptions, _) => + result match { + case Left(t) => + handleRetry( + t, + connection.some, + connectionId.next, + payload, + subscriptions, + newLatch, + attempt + ) + case Right(_) => + Connected(connectionId, connection, payload, subscriptions) -> + startSubscriptions(connection, subscriptions) + } + case s => + s -> (s"Unexpected state when initializing.".errorF >> s"State Is: [$s]".traceF >> + InvalidInvocationException( + s"Unexpected state when initializing. State may be inconsistent." + ).raiseF) + } + } yield ()) + .guaranteeCase { + case Outcome.Succeeded(_) | Outcome.Errored(_) => F.unit + case Outcome.Canceled() => disconnect().start.void // Cleanup. } - } private def gracefulTerminate( connection: WebSocketConnection[F], @@ -553,16 +399,16 @@ class ApolloClient[F[_], P, S]( private def haltSubscription(subscriptionId: String): F[Unit] = s"Halting subscription [$subscriptionId]".debugF >> state.get.flatMap { - case Initialized(_, _, subscriptions, _) => + case Connected(_, _, _, subscriptions) => for { _ <- s"Current subscriptions: [${subscriptions.keySet}]".traceF _ <- subscriptions.get(subscriptionId) match { case None => - F.raiseError(new InvalidSubscriptionOperationException("stop", subscriptionId)) + (new InvalidSubscriptionOperationException("stop", subscriptionId)).raiseF case Some(subscription) => subscription.halt } } yield () - case s @ _ => + case s @ _ => InvalidSubscriptionOperationException("stop", subscriptionId).logAndRaiseF } @@ -631,27 +477,18 @@ class ApolloClient[F[_], P, S]( errorPolicy: ErrorPolicyProcessor[D, R] ): F[GraphQLSubscription[F, R]] = state.get.flatMap { - case Initialized(_, _, _, _) => + case Connected(_, _, _, _) => val request = GraphQLRequest(subscription, operationName, variables) buildQueue[D](request).flatMap { case (id, emitter) => def acquire: F[Unit] = s"Acquiring queue for subscription [$id]".traceF >> stateModify { - case Initialized(cid, c, subscriptions, i) => - Initialized(cid, c, subscriptions + (id -> emitter), i) -> F.unit - case s @ Initializing(_, _, _, _, latch) => + case Connected(cid, c, i, subscriptions) => + Connected(cid, c, i, subscriptions + (id -> emitter)) -> F.unit + case s @ Connecting(_, _, _, _, latch) => s -> (latch.resolve >> acquire) - case Reestablishing(cid, subscriptions, i, connectLatch, initLatch) => - Reestablishing( - cid, - subscriptions + (id -> emitter), - i, - connectLatch, - initLatch - ) -> - F.unit - case s @ _ => + case s @ _ => s -> InvalidSubscriptionOperationException("acquire queue", id).logAndRaiseF } @@ -659,31 +496,23 @@ class ApolloClient[F[_], P, S]( def release: F[Unit] = s"Releasing queue for subscription[$id]".traceF >> stateModify { - case Initialized(cid, c, subscriptions, i) => - Initialized(cid, c, subscriptions - id, i) -> F.unit - case s @ Initializing(_, _, _, _, latch) => + case Connected(cid, c, i, subscriptions) => + Connected(cid, c, i, subscriptions - id) -> F.unit + case s @ Connecting(_, _, _, _, latch) => s -> (latch.resolve >> release) - case Reestablishing(cid, subscriptions, i, connectLatch, initLatch) => - Reestablishing(cid, subscriptions - id, i, connectLatch, initLatch) -> - F.unit - case s @ (Connected(_, _) | Disconnected(_)) => - // It's OK to call release when Connected or Disconnected. - // It may happen if protocol was terminated or client disconnected and we are halting streams. + case s @ Disconnected(_) => + // It's OK to call release when Disconnected. + // It may happen if client is disconnected and we are halting streams. s -> F.unit - case s @ _ => - s -> - InvalidSubscriptionOperationException("release queue", id).logAndRaiseF } def sendStart: F[Unit] = state.get.flatMap { // The connection may have changed since we created the subscription, so we re-get it. - case Initialized(_, currentConnection, _, _) => + case Connected(_, currentConnection, _, _) => currentConnection.send(StreamingMessage.FromClient.Start(id, request)) - case Initializing(_, _, _, _, latch) => + case Connecting(_, _, _, _, latch) => latch.resolve >> sendStart - case Reestablishing(_, _, _, _, initLatch) => - initLatch.resolve >> sendStart - case s @ _ => + case s @ _ => InvalidSubscriptionOperationException("send start", id).logAndRaiseF } @@ -703,22 +532,19 @@ class ApolloClient[F[_], P, S]( (acquire >> sendStart).as(createSubscription(stream, id)) } - case Initializing(_, _, _, _, latch) => + case Connecting(_, _, _, _, latch) => latch.resolve >> startSubscription(subscription, operationName, variables, errorPolicy) - case Reestablishing(_, _, _, _, initLatch) => - initLatch.resolve >> - startSubscription(subscription, operationName, variables, errorPolicy) - case _ => + case _ => ConnectionNotInitializedException.logAndRaiseF_ } private def sendStop(subscriptionId: String): F[Unit] = state.get.flatMap { // The connection may have changed since we created the subscription, so we re-get it. - case Initialized(_, currentConnection, _, _) => + case Connected(_, currentConnection, _, _) => currentConnection.send(StreamingMessage.FromClient.Stop(subscriptionId)) - case s @ _ => + case s @ _ => InvalidSubscriptionOperationException("send stop", subscriptionId).logAndRaiseF } diff --git a/core/src/main/scala/clue/websocket/Emitter.scala b/core/src/main/scala/clue/websocket/Emitter.scala new file mode 100644 index 00000000..bc2e4650 --- /dev/null +++ b/core/src/main/scala/clue/websocket/Emitter.scala @@ -0,0 +1,16 @@ +// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA) +// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause + +package clue.websocket + +import clue.model.* +import io.circe.* + +// Internal structure to emit data and errors to the client. +protected[clue] trait Emitter[F[_]] { + val request: GraphQLRequest[JsonObject] + + def emitData(response: GraphQLResponse[Json]): F[Unit] + def emitErrors(errors: GraphQLErrors): F[Unit] + val halt: F[Unit] +} diff --git a/core/src/main/scala/clue/websocket/State.scala b/core/src/main/scala/clue/websocket/State.scala new file mode 100644 index 00000000..7536f360 --- /dev/null +++ b/core/src/main/scala/clue/websocket/State.scala @@ -0,0 +1,34 @@ +// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA) +// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause + +package clue.websocket + +import clue.* +import io.circe.* + +// Client internal state for the FSM. +// We keep a connectionId throughout all states to ensure that callback events (onClose, onMessage) +// correpond to the current connection iteration. This is important in case of reconnections. +protected sealed abstract class State[F[_]](val status: PersistentClientStatus) { + val connectionId: ConnectionId +} + +protected object State { + final case class Disconnected[F[_]](connectionId: ConnectionId) + extends State[F](PersistentClientStatus.Disconnected) + + final case class Connecting[F[_]]( + connectionId: ConnectionId, + connection: Option[WebSocketConnection[F]], + initPayload: F[Map[String, Json]], + subscriptions: Map[String, Emitter[F]], + latch: Latch[F] + ) extends State[F](PersistentClientStatus.Connecting) + + final case class Connected[F[_]]( + connectionId: ConnectionId, + connection: WebSocketConnection[F], + initPayload: F[Map[String, Json]], + subscriptions: Map[String, Emitter[F]] + ) extends State[F](PersistentClientStatus.Connected) +} diff --git a/http4s-jdk-demo/src/main/resources/simplelogger.properties b/http4s-jdk-demo/src/main/resources/simplelogger.properties new file mode 100644 index 00000000..ba270bfe --- /dev/null +++ b/http4s-jdk-demo/src/main/resources/simplelogger.properties @@ -0,0 +1 @@ +org.slf4j.simpleLogger.defaultLogLevel=trace \ No newline at end of file diff --git a/http4s-jdk-demo/src/main/scala/clue/http4sjdkDemo/Demo.scala b/http4s-jdk-demo/src/main/scala/clue/http4sjdkDemo/Demo.scala index 62006a39..50e481d4 100644 --- a/http4s-jdk-demo/src/main/scala/clue/http4sjdkDemo/Demo.scala +++ b/http4s-jdk-demo/src/main/scala/clue/http4sjdkDemo/Demo.scala @@ -36,7 +36,7 @@ object Demo extends IOApp.Simple { object Query extends GraphQLOperation.Typed.NoInput[DemoDB, Json] { override val document: String = """ |query { - | observations(WHERE: {programId: {EQ: "p-2"}}) { + | observations(WHERE: {program: {id: {EQ: "p-2"}}}) { | matches { | id | title @@ -75,14 +75,22 @@ object Demo extends IOApp.Simple { def withLogger[F[_]: Sync]: Resource[F, Logger[F]] = Resource.make(Slf4jLogger.create[F])(_ => Applicative[F].unit) + def initPayload[F[_]: Sync]: F[Map[String, Json]] = + Sync[F] + .delay(sys.env.get("ODB_SERVICE_JWT")) + .map( + _.map(token => Map("Authorization" -> Json.fromString(s"Bearer $token"))) + .getOrElse(Map.empty) + ) + def withStreamingClient[F[_]: Async: Logger]: Resource[F, WebSocketClient[F, DemoDB]] = for { client <- Resource.eval(JdkWSClient.simple) backend = Http4sWebSocketBackend(client) - uri = uri"wss://lucuma-odb-development.herokuapp.com/ws" + uri = uri"wss://lucuma-postgres-odb-dev.herokuapp.com/ws" sc <- Resource.eval(Http4sWebSocketClient.of[F, DemoDB](uri)(using Async[F], Logger[F], backend)) - _ <- Resource.make(sc.connect() >> sc.initialize())(_ => sc.terminate() >> sc.disconnect()) + _ <- Resource.make(sc.connect(initPayload))(_ => sc.disconnect()) } yield sc val allStatus =