diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala index 025de33a04161..38a59e914194a 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala @@ -141,7 +141,7 @@ class JsonConnectionController( } override def receive: Receive = { - case JsonRpcServer.WebConnect(webActor) => + case JsonRpcServer.WebConnect(webActor, _) => unstashAll() context.become(connected(webActor)) case _ => stash() @@ -180,7 +180,7 @@ class JsonConnectionController( case Request(_, id, _) => sender() ! ResponseError(Some(id), SessionNotInitialisedError) - case MessageHandler.Disconnected => + case MessageHandler.Disconnected(_) => context.stop(self) } @@ -304,7 +304,7 @@ class JsonConnectionController( case Request(InitProtocolConnection, id, _) => sender() ! ResponseError(Some(id), SessionAlreadyInitialisedError) - case MessageHandler.Disconnected => + case MessageHandler.Disconnected(_) => logger.info("Json session terminated [{}].", rpcSession.clientId) context.system.eventStream.publish(JsonSessionTerminated(rpcSession)) context.stop(self) diff --git a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala index 6c8e02408b869..d06dcec33ec36 100644 --- a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala +++ b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/JsonRpcServer.scala @@ -36,11 +36,15 @@ class JsonRpcServer( implicit val ec: ExecutionContext = system.dispatcher - private def newUser(): Flow[Message, Message, NotUsed] = { + private def newUser(port: Int): Flow[Message, Message, NotUsed] = { val messageHandler = system.actorOf( Props( - new MessageHandlerSupervisor(clientControllerFactory, protocolFactory) + new MessageHandlerSupervisor( + clientControllerFactory, + protocolFactory, + port + ) ), s"message-handler-supervisor-${UUID.randomUUID()}" ) @@ -61,9 +65,11 @@ class JsonRpcServer( .to( Sink.actorRef[MessageHandler.WebMessage]( messageHandler, - MessageHandler.Disconnected, - { _: Any => - MessageHandler.Disconnected + MessageHandler.Disconnected(port), + { _: Throwable => + // TODO: If enabled, the warning would produce too much noise in tests + // logger.warn(s"Connection closed abruptly: ${e.getMessage}", e) + MessageHandler.Disconnected(port) } ) ) @@ -77,7 +83,7 @@ class JsonRpcServer( OverflowStrategy.fail ) .mapMaterializedValue { outActor => - messageHandler ! MessageHandler.Connected(outActor) + messageHandler ! MessageHandler.Connected(outActor, port) NotUsed } .map((outMsg: MessageHandler.WebMessage) => TextMessage(outMsg.message)) @@ -88,10 +94,10 @@ class JsonRpcServer( Flow.fromSinkAndSource(incomingMessages, outgoingMessages) } - private val route: Route = { + private def route(port: Int): Route = { val webSocketEndpoint = path(config.path) { - get { handleWebSocketMessages(newUser()) } + get { handleWebSocketMessages(newUser(port)) } } optionalEndpoints.foldLeft(webSocketEndpoint) { (chain, next) => @@ -109,7 +115,7 @@ class JsonRpcServer( def bind(interface: String, port: Int): Future[Http.ServerBinding] = Http() .newServerAt(interface, port) - .bind(route) + .bind(route(port)) } object JsonRpcServer { @@ -138,6 +144,6 @@ object JsonRpcServer { Config(outgoingBufferSize = 1000, lazyMessageTimeout = 10.seconds) } - case class WebConnect(webActor: ActorRef) + case class WebConnect(webActor: ActorRef, port: Int) } diff --git a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandler.scala b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandler.scala index a71b48947f096..e28950e012d29 100644 --- a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandler.scala +++ b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandler.scala @@ -20,7 +20,7 @@ class MessageHandler(protocolFactory: ProtocolFactory, controller: ActorRef) * @return the actor behavior. */ override def receive: Receive = { - case MessageHandler.Connected(webConnection) => + case MessageHandler.Connected(webConnection, _) => unstashAll() context.become(established(webConnection, Map())) case _ => stash() @@ -38,8 +38,8 @@ class MessageHandler(protocolFactory: ProtocolFactory, controller: ActorRef) ): Receive = { case MessageHandler.WebMessage(msg) => handleWebMessage(msg, webConnection, awaitingResponses) - case MessageHandler.Disconnected => - controller ! MessageHandler.Disconnected + case MessageHandler.Disconnected(port) => + controller ! MessageHandler.Disconnected(port) context.stop(self) case request: Request[Method, Any] => issueRequest(request, webConnection, awaitingResponses) @@ -192,10 +192,10 @@ object MessageHandler { /** A control message used for [[MessageHandler]] initializations * @param webConnection the actor representing the web. */ - case class Connected(webConnection: ActorRef) + case class Connected(webConnection: ActorRef, port: Int) /** A control message used to notify the controller about * the connection being closed. */ - case object Disconnected + case class Disconnected(port: Int) } diff --git a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandlerSupervisor.scala b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandlerSupervisor.scala index 90a1fedfb1e07..a4c8132338f71 100644 --- a/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandlerSupervisor.scala +++ b/lib/scala/json-rpc-server/src/main/scala/org/enso/jsonrpc/MessageHandlerSupervisor.scala @@ -19,7 +19,8 @@ import java.util.UUID */ final class MessageHandlerSupervisor( clientControllerFactory: ClientControllerFactory, - protocolFactory: ProtocolFactory + protocolFactory: ProtocolFactory, + port: Int ) extends Actor with LazyLogging with Stash { @@ -58,7 +59,7 @@ final class MessageHandlerSupervisor( Props(new MessageHandler(protocolFactory, clientActor)), s"message-handler-$clientId" ) - clientActor ! JsonRpcServer.WebConnect(messageHandler) + clientActor ! JsonRpcServer.WebConnect(messageHandler, port) context.become(initialized(messageHandler)) unstashAll() diff --git a/lib/scala/json-rpc-server/src/test/scala/org/enso/jsonrpc/MessageHandlerSpec.scala b/lib/scala/json-rpc-server/src/test/scala/org/enso/jsonrpc/MessageHandlerSpec.scala index 64f993e7e326c..3657592d38c48 100644 --- a/lib/scala/json-rpc-server/src/test/scala/org/enso/jsonrpc/MessageHandlerSpec.scala +++ b/lib/scala/json-rpc-server/src/test/scala/org/enso/jsonrpc/MessageHandlerSpec.scala @@ -95,7 +95,7 @@ class MessageHandlerSpec handler = system.actorOf( Props(new MessageHandler(MyProtocolFactory, controller.ref)) ) - handler ! Connected(out.ref) + handler ! Connected(out.ref, 0) } "Message handler" must { diff --git a/lib/scala/project-manager/src/main/resources/application.conf b/lib/scala/project-manager/src/main/resources/application.conf index f2b7302ed9d0d..f28e50971d55e 100644 --- a/lib/scala/project-manager/src/main/resources/application.conf +++ b/lib/scala/project-manager/src/main/resources/application.conf @@ -117,7 +117,7 @@ project-manager { request-timeout = 10 seconds boot-timeout = 40 seconds shutdown-timeout = 20 seconds - delayed-shutdown-timeout = 8 seconds + delayed-shutdown-timeout = 3 seconds socket-close-timeout = 15 seconds retries = 5 } diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/data/LanguageServerStatus.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/data/LanguageServerStatus.scala new file mode 100644 index 0000000000000..525f903543b09 --- /dev/null +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/data/LanguageServerStatus.scala @@ -0,0 +1,3 @@ +package org.enso.projectmanager.data + +case class LanguageServerStatus(open: Boolean, shuttingDown: Boolean) diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/data/RunningStatus.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/data/RunningStatus.scala new file mode 100644 index 0000000000000..d64c13ce7ad28 --- /dev/null +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/data/RunningStatus.scala @@ -0,0 +1,3 @@ +package org.enso.projectmanager.data + +case class RunningStatus(open: Boolean, shuttingDown: Boolean) diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/event/ClientEvent.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/event/ClientEvent.scala index 03d42986b09a6..2c9f912fca114 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/event/ClientEvent.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/event/ClientEvent.scala @@ -11,14 +11,16 @@ object ClientEvent { /** Notifies the Language Server about a new client connecting. * * @param clientId an object representing a client + * @param port the port number to which the client connected */ - case class ClientConnected(clientId: UUID) extends ClientEvent + case class ClientConnected(clientId: UUID, port: Int) extends ClientEvent /** Notifies the Language Server about a client disconnecting. * The client may not send any further messages after this one. * * @param clientId the internal id of this client + * @param port the port number from which the client disconnected */ - case class ClientDisconnected(clientId: UUID) extends ClientEvent + case class ClientDisconnected(clientId: UUID, port: Int) extends ClientEvent } diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala index bd2c5c7eccd07..c96208bf27b39 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerController.scala @@ -15,7 +15,10 @@ import nl.gn0s1s.bump.SemVer import org.enso.logger.akka.ActorMessageLogging import org.enso.projectmanager.boot.configuration._ import org.enso.projectmanager.data.{LanguageServerSockets, Socket} -import org.enso.projectmanager.event.ClientEvent.ClientDisconnected +import org.enso.projectmanager.event.ClientEvent.{ + ClientConnected, + ClientDisconnected +} import org.enso.projectmanager.event.ProjectEvent.ProjectClosed import org.enso.projectmanager.infrastructure.http.AkkaBasedWebSocketConnectionFactory import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootLoader.{ @@ -24,7 +27,11 @@ import org.enso.projectmanager.infrastructure.languageserver.LanguageServerBootL } import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController._ import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._ -import org.enso.projectmanager.infrastructure.languageserver.LanguageServerRegistry.ServerShutDown +import org.enso.projectmanager.infrastructure.languageserver.LanguageServerRegistry.{ + LanguageServerStatus, + LanguageServerStatusRequest, + ServerShutDown +} import org.enso.projectmanager.model.Project import org.enso.projectmanager.service.LoggingServiceDescriptor import org.enso.projectmanager.util.UnhandledLogging @@ -93,6 +100,7 @@ class LanguageServerController( override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[ClientDisconnected]) + context.system.eventStream.subscribe(self, classOf[ClientConnected]) self ! Boot } @@ -160,12 +168,12 @@ class LanguageServerController( private def supervising( connectionInfo: LanguageServerConnectionInfo, serverProcessManager: ActorRef, - clients: Set[UUID] = Set.empty, - scheduledShutdown: Option[Cancellable] = None + clients: Set[UUID] = Set.empty, + scheduledShutdown: Option[(Cancellable, Int)] = None ): Receive = LoggingReceive.withLabel("supervising") { case StartServer(clientId, _, requestedEngineVersion, _, _) => - scheduledShutdown.foreach(_.cancel()) + scheduledShutdown.foreach(_._1.cancel()) if (requestedEngineVersion != engineVersion) { sender() ! ServerBootFailed( new IllegalStateException( @@ -192,7 +200,7 @@ class LanguageServerController( ) } case Terminated(_) => - scheduledShutdown.foreach(_.cancel()) + scheduledShutdown.foreach(_._1.cancel()) logger.debug("Bootloader for {} terminated.", project) case StopServer(clientId, _) => @@ -202,28 +210,49 @@ class LanguageServerController( clients, clientId, Some(sender()), + explicitShutdownRequested = true, + None, scheduledShutdown ) case ScheduledShutdown(requester) => shutDownServer(requester) + case LanguageServerStatusRequest => + sender() ! LanguageServerStatus(project.id, scheduledShutdown.isDefined) + case ShutDownServer => - scheduledShutdown.foreach(_.cancel()) + scheduledShutdown.foreach(_._1.cancel()) shutDownServer(None) - case ClientDisconnected(clientId) => + case ClientDisconnected(clientId, port) => removeClient( connectionInfo, serverProcessManager, clients, clientId, None, + explicitShutdownRequested = false, + atPort = Some(port), scheduledShutdown ) + case ClientConnected(clientId, clientPort) => + scheduledShutdown match { + case Some((cancellable, port)) if clientPort == port => + cancellable.cancel() + context.become( + supervising( + connectionInfo, + serverProcessManager, + clients ++ Set(clientId), + None + ) + ) + case _ => + } case RenameProject(_, namespace, oldName, newName) => - scheduledShutdown.foreach(_.cancel()) + scheduledShutdown.foreach(_._1.cancel()) val socket = Socket(connectionInfo.interface, connectionInfo.rpcPort) context.actorOf( ProjectRenameAction @@ -241,7 +270,7 @@ class LanguageServerController( ) case ServerDied => - scheduledShutdown.foreach(_.cancel()) + scheduledShutdown.foreach(_._1.cancel()) logger.error("Language server died [{}].", connectionInfo) context.stop(self) @@ -253,30 +282,39 @@ class LanguageServerController( clients: Set[UUID], clientId: UUID, maybeRequester: Option[ActorRef], - shutdownTimeout: Option[Cancellable] + explicitShutdownRequested: Boolean, + atPort: Option[Int], + shutdownTimeout: Option[(Cancellable, Int)] ): Unit = { val updatedClients = clients - clientId if (updatedClients.isEmpty) { - logger.debug("Delaying shutdown for project {}.", project.id) - val scheduledShutdown = - shutdownTimeout.orElse( - Some( - context.system.scheduler - .scheduleOnce( - timeoutConfig.delayedShutdownTimeout, - self, - ScheduledShutdown(maybeRequester) + if (!explicitShutdownRequested) { + logger.debug("Delaying shutdown for project {}.", project.id) + val scheduledShutdown: Option[(Cancellable, Int)] = + shutdownTimeout.orElse( + Some( + ( + context.system.scheduler.scheduleOnce( + timeoutConfig.delayedShutdownTimeout, + self, + ScheduledShutdown(maybeRequester) + ), + atPort.getOrElse(0) ) + ) + ) + context.become( + supervising( + connectionInfo, + serverProcessManager, + Set.empty, + scheduledShutdown ) ) - context.become( - supervising( - connectionInfo, - serverProcessManager, - Set.empty, - scheduledShutdown - ) - ) + } else { + shutdownTimeout.foreach(_._1.cancel()) + shutDownServer(maybeRequester) + } } else { sender() ! CannotDisconnectOtherClients context.become( @@ -329,7 +367,7 @@ class LanguageServerController( maybeRequester.foreach(_ ! ServerShutdownTimedOut) stop() - case ClientDisconnected(clientId) => + case ClientDisconnected(clientId, _) => logger.debug( s"Received client ($clientId) disconnect request during shutdown. Ignoring." ) diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGateway.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGateway.scala index f28eeed5e5676..272c39f2d0964 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGateway.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGateway.scala @@ -54,7 +54,7 @@ trait LanguageServerGateway[F[+_, +_]] { * @param projectId a project id * @return true if project is open */ - def isRunning(projectId: UUID): F[CheckTimeout.type, Boolean] + def isRunning(projectId: UUID): F[CheckTimeout.type, (Boolean, Boolean)] /** Request a language server to rename project. * diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewayImpl.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewayImpl.scala index 400d60df80932..f9797e9cd02f4 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewayImpl.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewayImpl.scala @@ -85,12 +85,14 @@ class LanguageServerGatewayImpl[ } /** @inheritdoc */ - override def isRunning(projectId: UUID): F[CheckTimeout.type, Boolean] = { + override def isRunning( + projectId: UUID + ): F[CheckTimeout.type, (Boolean, Boolean)] = { implicit val timeout: Timeout = Timeout(timeoutConfig.requestTimeout) Async[F] .fromFuture { () => - (registry ? CheckIfServerIsRunning(projectId)).mapTo[Boolean] + (registry ? CheckIfServerIsRunning(projectId)).mapTo[(Boolean, Boolean)] } .mapError(_ => CheckTimeout) } diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala index bf371d3805d15..487c129c1a836 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerRegistry.scala @@ -1,17 +1,15 @@ package org.enso.projectmanager.infrastructure.languageserver -import akka.actor.{Actor, ActorRef, Props, Terminated} +import akka.actor.{Actor, ActorRef, Cancellable, Props, Terminated} import com.typesafe.scalalogging.LazyLogging import org.enso.projectmanager.boot.configuration._ import org.enso.projectmanager.infrastructure.languageserver.LanguageServerController.Retry import org.enso.projectmanager.infrastructure.languageserver.LanguageServerProtocol._ -import org.enso.projectmanager.infrastructure.languageserver.LanguageServerRegistry.ServerShutDown import org.enso.projectmanager.service.LoggingServiceDescriptor import org.enso.projectmanager.util.UnhandledLogging import org.enso.projectmanager.versionmanagement.DistributionConfiguration import java.util.UUID - import scala.concurrent.duration._ /** An actor that routes request regarding lang. server lifecycle to the @@ -41,10 +39,14 @@ class LanguageServerRegistry( with LazyLogging with UnhandledLogging { + import LanguageServerRegistry._ + import context.dispatcher + override def receive: Receive = running() private def running( - serverControllers: Map[UUID, ActorRef] = Map.empty + serverControllers: Map[UUID, ActorRef] = Map.empty, + pendingReplies: Map[UUID, (Seq[ActorRef], Cancellable)] = Map.empty ): Receive = { case msg @ StartServer( _, @@ -75,7 +77,12 @@ class LanguageServerRegistry( ) context.watch(controller) controller.forward(msg) - context.become(running(serverControllers + (project.id -> controller))) + context.become( + running( + serverControllers + (project.id -> controller), + pendingReplies + ) + ) } case msg @ StopServer(_, projectId) => @@ -94,7 +101,7 @@ class LanguageServerRegistry( "language-server-killer" ) killer.forward(msg) - context.become(running()) + context.become(running(pendingReplies = pendingReplies)) case ServerShutDown(projectId) => context.become(running(serverControllers - projectId)) @@ -107,10 +114,83 @@ class LanguageServerRegistry( } case Terminated(ref) => - context.become(running(serverControllers.filterNot(_._2 == ref))) + val pending = serverControllers + .find(_._2 == ref) + .map(e => pendingReplies.filterNot(_._1 == e._1)) + .getOrElse(pendingReplies) + context.become(running(serverControllers.filterNot(_._2 == ref), pending)) case CheckIfServerIsRunning(projectId) => - sender() ! serverControllers.contains(projectId) + serverControllers.get(projectId) match { + case Some(ref) => + val replyTo = sender() + pendingReplies.get(projectId) match { + case Some((prevRefs, cancellable)) if !prevRefs.contains(replyTo) => + context.become( + running( + serverControllers, + pendingReplies + ( + ( + projectId, + (replyTo +: prevRefs, cancellable) + ) + ) + ) + ) + case None => + val scheduledRequest = + context.system.scheduler.scheduleOnce( + timeoutConfig.requestTimeout, + self, + LanguageServerStatusTimeout(projectId) + ) + ref ! LanguageServerStatusRequest + context.become( + running( + serverControllers, + pendingReplies + ( + ( + projectId, + (Seq(replyTo), scheduledRequest) + ) + ) + ) + ) + case _ => + // Do nothing, still waiting + } + case None => + sender() ! (false, false) + } + + case LanguageServerStatus(uuid, shuttingDown) => + pendingReplies.get(uuid) match { + case Some((replyTo, cancellable)) => + cancellable.cancel() + replyTo.foreach(_ ! (true, shuttingDown)) + context.become( + running(serverControllers, pendingReplies.filterNot(_._1 == uuid)) + ) + case None => + logger.warn( + "Unknown request for language server state for project {}", + uuid + ) + } + + case LanguageServerStatusTimeout(uuid) => + pendingReplies.get(uuid) match { + case Some((replyTo, _)) => + replyTo.foreach(_ ! CheckTimeout) + context.become( + running(serverControllers, pendingReplies.filterNot(_._1 == uuid)) + ) + case None => + logger.warn( + "Unknown request for language server status for project {}", + uuid + ) + } case Retry(message) => context.system.scheduler.scheduleOnce(200.millis, self, message)( @@ -165,4 +245,21 @@ object LanguageServerRegistry { ) ) + /** A notification that no project status response has been received within a timeout. + * + * @param projectId uuid of a project that its language server failed to report on + */ + case class LanguageServerStatusTimeout(projectId: UUID) + + /** The state of language server for a given project. + * + * @param projectId uuid of the project + * @param shuttingDown if true, the project is currently in a soft shutdown state, false otherwise + */ + case class LanguageServerStatus(projectId: UUID, shuttingDown: Boolean) + + /** A message requesting the current state of the language server. + */ + case object LanguageServerStatusRequest + } diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ClientController.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ClientController.scala index 6175f54a230f1..c8d5973f14b79 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ClientController.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ClientController.scala @@ -65,6 +65,13 @@ class ClientController[F[+_, +_]: Exec: CovariantFlatMap: ErrorChannel]( timeoutConfig.bootTimeout, timeoutConfig.retries ), + ProjectStatus -> ProjectStatusHandler + .props[F]( + clientId, + projectService, + timeoutConfig.bootTimeout, + timeoutConfig.retries + ), ProjectClose -> ProjectCloseHandler .props[F]( clientId, @@ -106,19 +113,19 @@ class ClientController[F[+_, +_]: Exec: CovariantFlatMap: ErrorChannel]( ) override def receive: Receive = { - case JsonRpcServer.WebConnect(webActor) => + case JsonRpcServer.WebConnect(webActor, port) => logger.info("Client connected to Project Manager [{}]", clientId) unstashAll() context.become(connected(webActor)) - context.system.eventStream.publish(ClientConnected(clientId)) + context.system.eventStream.publish(ClientConnected(clientId, port)) case _ => stash() } def connected(@unused webActor: ActorRef): Receive = { - case MessageHandler.Disconnected => + case MessageHandler.Disconnected(port) => logger.info("Client disconnected from the Project Manager [{}]", clientId) - context.system.eventStream.publish(ClientDisconnected(clientId)) + context.system.eventStream.publish(ClientDisconnected(clientId, port)) context.stop(self) case r @ Request(method, _, _) if requestHandlers.contains(method) => diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/JsonRpc.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/JsonRpc.scala index fc7b887efed10..dd27da6e8c645 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/JsonRpc.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/JsonRpc.scala @@ -18,6 +18,7 @@ object JsonRpc { lazy val protocol: Protocol = Protocol.empty .registerRequest(ProjectCreate) + .registerRequest(ProjectStatus) .registerRequest(ProjectDelete) .registerRequest(ProjectOpen) .registerRequest(ProjectClose) diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala index a0f21a061c712..13698b78cd12e 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/protocol/ProjectManagementApi.scala @@ -1,7 +1,6 @@ package org.enso.projectmanager.protocol import java.util.UUID - import io.circe.Json import io.circe.syntax._ import nl.gn0s1s.bump.SemVer @@ -11,6 +10,7 @@ import org.enso.projectmanager.data.{ EngineVersion, MissingComponentAction, ProjectMetadata, + RunningStatus, Socket } @@ -99,6 +99,25 @@ object ProjectManagementApi { } } + case object ProjectStatus extends Method("project/status") { + + case class Params( + projectId: UUID + ) + + case class Result(status: RunningStatus) + + implicit val hasParams: HasParams.Aux[this.type, ProjectStatus.Params] = + new HasParams[this.type] { + type Params = ProjectStatus.Params + } + + implicit val hasResult: HasResult.Aux[this.type, ProjectStatus.Result] = + new HasResult[this.type] { + type Result = ProjectStatus.Result + } + } + case object ProjectClose extends Method("project/close") { case class Params(projectId: UUID) @@ -311,6 +330,9 @@ object ProjectManagementApi { case object CannotRemoveOpenProjectError extends Error(4008, "Cannot remove open project") + case object CannotRemoveClosingProjectError + extends Error(4014, "Cannot remove closing project") + case class ProjectCloseError(msg: String) extends Error(4009, msg) case class LanguageServerError(msg: String) extends Error(4010, msg) diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectServiceFailureMapper.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectServiceFailureMapper.scala index 65c3f25fa7fba..8ba6819b6ef52 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectServiceFailureMapper.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectServiceFailureMapper.scala @@ -25,6 +25,7 @@ object ProjectServiceFailureMapper { case ProjectNotOpen => ProjectNotOpenError case ProjectOpenByOtherPeers => ProjectOpenByOtherPeersError case CannotRemoveOpenProject => CannotRemoveOpenProjectError + case CannotRemoveClosingProject => CannotRemoveClosingProjectError case ProjectOperationTimeout => ServiceError case LanguageServerFailure(msg) => LanguageServerError(msg) case ProjectManagerUpgradeRequiredFailure(current, required) => diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectStatusHandler.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectStatusHandler.scala new file mode 100644 index 0000000000000..ad01847ba5dcd --- /dev/null +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/requesthandler/ProjectStatusHandler.scala @@ -0,0 +1,83 @@ +package org.enso.projectmanager.requesthandler + +import akka.actor.Props +import org.enso.projectmanager.control.core.CovariantFlatMap +import org.enso.projectmanager.control.core.syntax._ +import org.enso.projectmanager.control.effect.Exec +import org.enso.projectmanager.data.RunningStatus +import org.enso.projectmanager.protocol.ProjectManagementApi.ProjectStatus +import org.enso.projectmanager.requesthandler.ProjectServiceFailureMapper.failureMapper +import org.enso.projectmanager.service.{ + ProjectServiceApi, + ProjectServiceFailure +} + +import java.util.UUID +import scala.concurrent.duration.FiniteDuration + +/** A request handler for `project/status` commands. + * + * @param clientId the requester id + * @param projectService a project service + * @param requestTimeout a request timeout + * @param timeoutRetries a number of timeouts to wait until a failure is reported + */ +class ProjectStatusHandler[F[+_, +_]: Exec: CovariantFlatMap]( + clientId: UUID, + projectService: ProjectServiceApi[F], + requestTimeout: FiniteDuration, + timeoutRetries: Int +) extends RequestHandler[ + F, + ProjectServiceFailure, + ProjectStatus.type, + ProjectStatus.Params, + ProjectStatus.Result + ]( + ProjectStatus, + // TODO [RW] maybe we can get rid of this timeout since boot timeout is + // handled by the LanguageServerProcess; still the ? message of + // LanguageServerGateway will result in timeouts (#1315) + Some(requestTimeout), + timeoutRetries + ) { + + override def handleRequest = { params => + for { + server <- projectService.getProjectStatus( + clientId = clientId, + projectId = params.projectId + ) + } yield ProjectStatus.Result(status = + RunningStatus(server.open, server.shuttingDown) + ) + } + +} + +object ProjectStatusHandler { + + /** Creates a configuration object used to create a [[ProjectStatusHandler]]. + * + * @param clientId the requester id + * @param projectService a project service + * @param requestTimeout a request timeout + * @param timeoutRetries a number of timeouts to wait until a failure is reported + * @return a configuration object + */ + def props[F[+_, +_]: Exec: CovariantFlatMap]( + clientId: UUID, + projectService: ProjectServiceApi[F], + requestTimeout: FiniteDuration, + timeoutRetries: Int + ): Props = + Props( + new ProjectStatusHandler( + clientId, + projectService, + requestTimeout, + timeoutRetries + ) + ) + +} diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala index 4b86b6b0fea2f..9ea5aa57c2ff3 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectService.scala @@ -16,6 +16,7 @@ import org.enso.projectmanager.control.core.{ import org.enso.projectmanager.control.effect.syntax._ import org.enso.projectmanager.control.effect.{ErrorChannel, Sync} import org.enso.projectmanager.data.{ + LanguageServerStatus, MissingComponentAction, ProjectMetadata, RunningLanguageServerInfo @@ -147,13 +148,14 @@ class ProjectService[ ): F[ProjectServiceFailure, Unit] = isServerRunning(projectId) .flatMap { - case false => CovariantFlatMap[F].pure(()) - case true => ErrorChannel[F].fail(CannotRemoveOpenProject) + case (false, _) => CovariantFlatMap[F].pure(()) + case (true, true) => ErrorChannel[F].fail(CannotRemoveClosingProject) + case (true, false) => ErrorChannel[F].fail(CannotRemoveOpenProject) } private def isServerRunning( projectId: UUID - ): F[ProjectServiceFailure, Boolean] = + ): F[ProjectServiceFailure, (Boolean, Boolean)] = languageServerGateway .isRunning(projectId) .mapError(_ => ProjectOperationTimeout) @@ -186,7 +188,7 @@ class ProjectService[ ): F[ProjectServiceFailure, Unit] = { val cmd = new MoveProjectDirCmd[F](projectId, newName, repo, log) CovariantFlatMap[F] - .ifM(isServerRunning(projectId))( + .ifM(isServerRunning(projectId).map(_._1))( ifTrue = for { _ <- log.debug( "Registering shutdown hook to rename the project [{}] " + @@ -519,4 +521,20 @@ class ProjectService[ } + /** Retrieve project info. + * + * @param clientId the requester id + * @param projectId the project id + * @return either failure or [[LanguageServerStatus]] representing success + */ + override def getProjectStatus( + clientId: UUID, + projectId: UUID + ): F[ProjectServiceFailure, LanguageServerStatus] = { + log.debug(s"Retrieving the state of project [{}].", projectId) + languageServerGateway + .isRunning(projectId) + .map(e => LanguageServerStatus(e._1, e._2)) + .mapError(_ => LanguageServerFailure("failed to retrieve project state")) + } } diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala index 5fdc410f7b90c..7c68385182e7f 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceApi.scala @@ -1,10 +1,10 @@ package org.enso.projectmanager.service import java.util.UUID - import akka.actor.ActorRef import nl.gn0s1s.bump.SemVer import org.enso.projectmanager.data.{ + LanguageServerStatus, MissingComponentAction, ProjectMetadata, RunningLanguageServerInfo @@ -67,6 +67,17 @@ trait ProjectServiceApi[F[+_, +_]] { missingComponentAction: MissingComponentAction ): F[ProjectServiceFailure, RunningLanguageServerInfo] + /** Retrieve project status. + * + * @param clientId the requester id + * @param projectId the project id + * @return either failure or [[LanguageServerStatus]] representing success + */ + def getProjectStatus( + clientId: UUID, + projectId: UUID + ): F[ProjectServiceFailure, LanguageServerStatus] + /** Closes a project. Tries to shut down the Language Server. * * @param clientId the requester id diff --git a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceFailure.scala b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceFailure.scala index 5262991411a9c..cf11340e10b46 100644 --- a/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceFailure.scala +++ b/lib/scala/project-manager/src/main/scala/org/enso/projectmanager/service/ProjectServiceFailure.scala @@ -59,6 +59,10 @@ object ProjectServiceFailure { */ case object CannotRemoveOpenProject extends ProjectServiceFailure + /** Signals that removal of project failed because project is still shutting down. + */ + case object CannotRemoveClosingProject extends ProjectServiceFailure + /** Signals operation timeout. */ case object ProjectOperationTimeout extends ProjectServiceFailure diff --git a/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewaySpec.scala b/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewaySpec.scala index c75f8aed95d16..9279c9e1af1d0 100644 --- a/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewaySpec.scala +++ b/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/infrastructure/languageserver/LanguageServerGatewaySpec.scala @@ -2,6 +2,7 @@ package org.enso.projectmanager.infrastructure.languageserver import akka.testkit.TestDuration import nl.gn0s1s.bump.SemVer +import io.circe.literal._ import org.enso.projectmanager.test.Net._ import org.enso.projectmanager.{BaseServerSpec, ProjectManagementOps} import org.enso.testkit.{FlakySpec, RetrySpec} @@ -43,6 +44,35 @@ class LanguageServerGatewaySpec deleteProject(bazId) } + "report language server status" in { + implicit val client = new WsTestClient(address) + val fooId = createProject("foo") + //val fooSocket = + openProject(fooId) + client.send(s""" + { "jsonrpc": "2.0", + "method": "project/status", + "id": 1, + "params": { + "projectId": "$fooId" + } + } + """) + client.expectJson(json""" + { "jsonrpc": "2.0", + "id": 1, + "result": { + "status" : { + "open" : true, + "shuttingDown" : false + } + } + } + """) + closeProject(fooId) + deleteProject(fooId) + } + } } diff --git a/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectShutdownSpec.scala b/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectShutdownSpec.scala index a920b07900b7d..021399e728f03 100644 --- a/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectShutdownSpec.scala +++ b/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectShutdownSpec.scala @@ -1,6 +1,7 @@ package org.enso.projectmanager.protocol import akka.actor.ActorRef +import io.circe.literal._ import nl.gn0s1s.bump.SemVer import org.enso.jsonrpc.ClientControllerFactory import org.enso.projectmanager.boot.configuration.TimeoutConfig @@ -51,28 +52,97 @@ class ProjectShutdownSpec config.timeout.copy(delayedShutdownTimeout = delayedShutdown) } + "ensure language server shuts down immediately when requesting to close the project" in { + val client1 = new WsTestClient(address) + val projectId = createProject("Foo")(client1) + openProject(projectId)(client1) + closeProject(projectId)(client1) + deleteProject(projectId)(client1) + } + "ensure language server does not shutdown immediately after last client disconnects" in { val client1 = new WsTestClient(address) val projectId = createProject("Foo")(client1) val socket1 = openProject(projectId)(client1) system.eventStream.publish( - ClientDisconnected(clientUUID) + ClientDisconnected(clientUUID, socket1.port) ) + client1.send(s""" + { "jsonrpc": "2.0", + "method": "project/status", + "id": 1, + "params": { + "projectId": "$projectId" + } + } + """) + client1.expectJson(json""" + { "jsonrpc": "2.0", + "id": 1, + "result": { + "status" : { + "open" : true, + "shuttingDown" : true + } + } + } + """) val client2 = new WsTestClient(address) val socket2 = openProject(projectId)(client2) socket2 shouldBe socket1 + client2.send(s""" + { "jsonrpc": "2.0", + "method": "project/status", + "id": 1, + "params": { + "projectId": "$projectId" + } + } + """) + client2.expectJson(json""" + { "jsonrpc": "2.0", + "id": 1, + "result": { + "status" : { + "open" : true, + "shuttingDown" : false + } + } + } + """) + closeProject(projectId)(client2) deleteProject(projectId)(client2) } "ensure language server does eventually shutdown after last client disconnects" in { - val client1 = new WsTestClient(address) - val projectId = createProject("Foo")(client1) - val socket1 = openProject(projectId)(client1) + val client = new WsTestClient(address) + val projectId = createProject("Foo")(client) + val socket1 = openProject(projectId)(client) system.eventStream.publish( - ClientDisconnected(clientUUID) + ClientDisconnected(clientUUID, socket1.port) ) + client.send(s""" + { "jsonrpc": "2.0", + "method": "project/status", + "id": 1, + "params": { + "projectId": "$projectId" + } + } + """) + client.expectJson(json""" + { "jsonrpc": "2.0", + "id": 1, + "result": { + "status" : { + "open" : true, + "shuttingDown" : true + } + } + } + """) Thread.sleep( (timeoutConfig.delayedShutdownTimeout + timeoutConfig.shutdownTimeout + 1.second).toMillis )