From 5751386570ce8fdfbde25405f8a59d19c2b7314a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Fri, 27 Aug 2021 14:01:13 +0200 Subject: [PATCH] Implement `library/preinstall` (Without Dependencies Yet) (#1972) --- RELEASES.md | 5 + build.sbt | 22 +- .../enso/languageserver/boot/MainModule.scala | 55 +++-- .../libraries/EditionReferenceResolver.scala | 15 +- .../libraries/LibraryConfig.scala | 4 +- .../libraries/LibraryInstallerConfig.scala | 17 ++ .../handler/LibraryPreinstallHandler.scala | 199 ++++++++++++--- .../json/JsonConnectionController.scala | 5 +- .../runtime/RuntimeConnector.scala | 98 ++++++-- .../websocket/json/BaseServerTest.scala | 47 ++-- .../websocket/json/LibrariesTest.scala | 127 ++++++---- .../json/ProjectSettingsManagerTest.scala | 4 +- .../distribution/DefaultManagers.scala | 19 +- .../LauncherResourceManager.scala | 82 +++++++ .../installation/DistributionInstaller.scala | 5 +- .../DistributionUninstaller.scala | 5 +- .../launcher/LauncherConcurrencyTest.scala | 102 ++++++++ .../org/enso/polyglot/runtime/Runtime.scala | 230 ++++++++++++------ .../java/org/enso/interpreter/Language.java | 33 ++- .../instrument/RuntimeServerInstrument.java | 15 +- .../org/enso/interpreter/runtime/Context.java | 24 +- .../interpreter/service/ExecutionService.java | 17 +- .../org/enso/compiler/PackageRepository.scala | 2 +- .../enso/interpreter/instrument/Handler.scala | 52 ++-- .../instrument/command/CommandFactory.scala | 5 + .../test/instrument/RuntimeErrorsTest.scala | 42 ++-- .../instrument/RuntimeInstrumentTest.scala | 42 ++-- .../instrument/RuntimeServerEmulator.scala | 83 +++++++ .../test/instrument/RuntimeServerTest.scala | 35 +-- .../test/instrument/RuntimeStdlibTest.scala | 35 +-- .../RuntimeSuggestionUpdatesTest.scala | 45 ++-- .../RuntimeVisualisationsTest.scala | 42 ++-- .../TestRuntimeServerConnector.scala | 40 +++ .../client/ConnectedLockManager.scala | 104 ++++++++ .../RuntimeServerConnectionEndpoint.scala | 16 ++ .../client/RuntimeServerRequestHandler.scala | 71 ++++++ .../server/LockManagerService.scala | 155 ++++++++++++ .../lockmanager/ActorToHandlerConnector.scala | 44 ++++ .../ConnectedLockManagerTest.scala | 154 ++++++++++++ .../org/enso/distribution/LanguageHome.scala | 6 +- .../locking/ResourceManager.scala | 73 ------ .../locking/ThreadSafeFileLockManager.scala | 2 +- .../locking/ThreadSafeLockManager.scala | 4 + .../jsonrpc/test/JsonRpcServerTestKit.scala | 3 +- .../published/repository/DownloaderTest.scala | 1 - .../DefaultLibraryProvider.scala | 2 +- .../ResolvingLibraryProvider.scala | 7 +- .../protocol/ProjectManagementApiSpec.scala | 13 +- .../test/SlowTestSynchronizer.scala | 7 + .../test/TestLocalLockManager.scala | 12 +- .../TestableThreadSafeFileLockManager.scala | 27 ++ .../locking/ConcurrencyTest.scala | 82 +------ .../ThreadSafeFileLockManagerTest.scala | 36 +-- .../org/enso/testkit}/TestSynchronizer.scala | 5 +- .../enso/testkit/WithTemporaryDirectory.scala | 2 +- tools/legal-review/engine/report-state | 2 +- 56 files changed, 1739 insertions(+), 642 deletions(-) create mode 100644 engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryInstallerConfig.scala create mode 100644 engine/launcher/src/main/scala/org/enso/launcher/distribution/LauncherResourceManager.scala create mode 100644 engine/launcher/src/test/scala/org/enso/launcher/LauncherConcurrencyTest.scala create mode 100644 engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerEmulator.scala create mode 100644 engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/TestRuntimeServerConnector.scala create mode 100644 lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/ConnectedLockManager.scala create mode 100644 lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerConnectionEndpoint.scala create mode 100644 lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerRequestHandler.scala create mode 100644 lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/server/LockManagerService.scala create mode 100644 lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ActorToHandlerConnector.scala create mode 100644 lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ConnectedLockManagerTest.scala create mode 100644 lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeLockManager.scala create mode 100644 lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/SlowTestSynchronizer.scala create mode 100644 lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestableThreadSafeFileLockManager.scala rename lib/scala/{runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test => testkit/src/main/scala/org/enso/testkit}/TestSynchronizer.scala (99%) diff --git a/RELEASES.md b/RELEASES.md index e47564eb72e5..621dd63b2645 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,5 +1,10 @@ # Enso Next +- Implement `library/preinstall` endpoint, allowing the IDE to request a library + to be installed asynchronously before importing it, so that adding the import + does not seem to freeze the compiler + ([#1972](https://github.com/enso-org/enso/pull/1972)). + # Enso 0.2.27 (2021-08-23) ## Libraries diff --git a/build.sbt b/build.sbt index 1e8f9650aa4c..710adcda2376 100644 --- a/build.sbt +++ b/build.sbt @@ -242,6 +242,7 @@ lazy val enso = (project in file(".")) `edition-uploader`, `library-manager`, `library-manager-test`, + `connected-lock-manager`, `stdlib-version-updater`, syntax.jvm, testkit @@ -1011,6 +1012,7 @@ lazy val `language-server` = (project in file("engine/language-server")) .dependsOn(`json-rpc-server`) .dependsOn(`task-progress-notifications`) .dependsOn(`library-manager`) + .dependsOn(`connected-lock-manager`) .dependsOn(`edition-updater`) .dependsOn(`logging-service`) .dependsOn(`polyglot-api`) @@ -1100,7 +1102,7 @@ lazy val runtime = (project in file("engine/runtime")) "org.scalatest" %% "scalatest" % scalatestVersion % Test, "org.graalvm.truffle" % "truffle-api" % graalVersion % Benchmark, "org.typelevel" %% "cats-core" % catsVersion, - "eu.timepit" %% "refined" % refinedVersion, + "eu.timepit" %% "refined" % refinedVersion ), // Note [Unmanaged Classpath] Compile / unmanagedClasspath += (`core-definition` / Compile / packageBin).value, @@ -1186,6 +1188,9 @@ lazy val runtime = (project in file("engine/runtime")) .dependsOn(graph) .dependsOn(pkg) .dependsOn(searcher) + .dependsOn(`edition-updater`) + .dependsOn(`library-manager`) + .dependsOn(`connected-lock-manager`) .dependsOn(syntax.jvm) .dependsOn(testkit % Test) @@ -1424,6 +1429,21 @@ lazy val `library-manager-test` = project .dependsOn(testkit) .dependsOn(`logging-service`) +lazy val `connected-lock-manager` = project + .in(file("lib/scala/connected-lock-manager")) + .configs(Test) + .settings( + libraryDependencies ++= Seq( + "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, + akkaActor, + akkaTestkit % Test, + "org.scalatest" %% "scalatest" % scalatestVersion % Test + ) + ) + .dependsOn(`distribution-manager`) + .dependsOn(`polyglot-api`) + .dependsOn(testkit % Test) + lazy val `stdlib-version-updater` = project .in(file("lib/scala/stdlib-version-updater")) .configs(Test) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala index 55646873a5ba..2c6b3c9caeda 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala @@ -1,6 +1,10 @@ package org.enso.languageserver.boot import akka.actor.ActorSystem +import org.enso.distribution.locking.{ + ResourceManager, + ThreadSafeFileLockManager +} import org.enso.distribution.{DistributionManager, Environment, LanguageHome} import org.enso.editions.EditionResolver import org.enso.editions.updater.EditionManager @@ -15,6 +19,7 @@ import org.enso.languageserver.io._ import org.enso.languageserver.libraries.{ EditionReferenceResolver, LibraryConfig, + LibraryInstallerConfig, LocalLibraryManager, ProjectSettingsManager } @@ -40,6 +45,7 @@ import org.enso.languageserver.util.binary.BinaryEncoder import org.enso.librarymanager.LibraryLocations import org.enso.librarymanager.local.DefaultLocalLibraryProvider import org.enso.librarymanager.published.PublishedLibraryCache +import org.enso.lockmanager.server.LockManagerService import org.enso.loggingservice.{JavaLoggingLogHandler, LogLevel} import org.enso.polyglot.{RuntimeOptions, RuntimeServerInfo} import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo, SqlVersionsRepo} @@ -120,8 +126,34 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: LogLevel) { lazy val sessionRouter = system.actorOf(SessionRouter.props(), "session-router") + val environment = new Environment {} + val languageHome = LanguageHome.detectFromExecutableLocation(environment) + val distributionManager = new DistributionManager(environment) + + val editionProvider = + EditionManager.makeEditionProvider(distributionManager, Some(languageHome)) + val editionResolver = EditionResolver(editionProvider) + val editionReferenceResolver = new EditionReferenceResolver( + contentRoot.file, + editionProvider, + editionResolver + ) + val editionManager = EditionManager(distributionManager, Some(languageHome)) + val lockManager = new ThreadSafeFileLockManager( + distributionManager.paths.locks + ) + val resourceManager = new ResourceManager(lockManager) + + val lockManagerService = system.actorOf( + LockManagerService.props(lockManager), + "lock-manager-service" + ) + lazy val runtimeConnector = - system.actorOf(RuntimeConnector.props, "runtime-connector") + system.actorOf( + RuntimeConnector.props(lockManagerService), + "runtime-connector" + ) lazy val contentRootManagerActor = system.actorOf( @@ -272,20 +304,6 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: LogLevel) { context )(system.dispatcher) - val environment = new Environment {} - val languageHome = LanguageHome.detectFromExecutableLocation(environment) - val distributionManager = new DistributionManager(environment) - - val editionProvider = - EditionManager.makeEditionProvider(distributionManager, Some(languageHome)) - val editionResolver = EditionResolver(editionProvider) - val editionReferenceResolver = new EditionReferenceResolver( - contentRoot.file, - editionProvider, - editionResolver - ) - val editionManager = EditionManager(distributionManager, Some(languageHome)) - val projectSettingsManager = system.actorOf( ProjectSettingsManager.props(contentRoot.file, editionResolver), "project-settings-manager" @@ -305,7 +323,12 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: LogLevel) { editionManager = editionManager, localLibraryProvider = DefaultLocalLibraryProvider.make(libraryLocations), publishedLibraryCache = - PublishedLibraryCache.makeReadOnlyCache(libraryLocations) + PublishedLibraryCache.makeReadOnlyCache(libraryLocations), + installerConfig = LibraryInstallerConfig( + distributionManager, + resourceManager, + Some(languageHome) + ) ) val jsonRpcControllerFactory = new JsonConnectionControllerFactory( diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/EditionReferenceResolver.scala b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/EditionReferenceResolver.scala index 778be011de37..aeb694a74809 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/EditionReferenceResolver.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/EditionReferenceResolver.scala @@ -3,7 +3,7 @@ package org.enso.languageserver.libraries import org.enso.editions.provider.EditionProvider import org.enso.editions.{DefaultEdition, EditionResolver, Editions} import org.enso.languageserver.libraries.EditionReference.NamedEdition -import org.enso.pkg.PackageManager +import org.enso.pkg.{Config, PackageManager} import java.io.File import scala.util.Try @@ -24,14 +24,15 @@ class EditionReferenceResolver( case EditionReference.NamedEdition(editionName) => editionProvider.findEditionForName(editionName).toTry case EditionReference.CurrentProjectEdition => - Try { - projectPackage.config.edition.getOrElse { - // TODO [RW] default edition from config (#1864) - DefaultEdition.getDefaultEdition - } - } + getCurrentProjectConfig.map(_.edition.getOrElse { + // TODO [RW] default edition from config (#1864) + DefaultEdition.getDefaultEdition + }) } + /** Returns the configuration of the current project. */ + def getCurrentProjectConfig: Try[Config] = Try { projectPackage.config } + /** Resolves all edition dependencies of an edition identified by * [[EditionReference]]. */ diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryConfig.scala b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryConfig.scala index 0aad3bb4393f..85f3574d801f 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryConfig.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryConfig.scala @@ -12,11 +12,13 @@ import org.enso.librarymanager.published.PublishedLibraryCache * @param editionManager an instance of edition manager * @param localLibraryProvider an instance of local library provider * @param publishedLibraryCache an instance of published library cache + * @param installerConfig configuration for the library installer */ case class LibraryConfig( localLibraryManager: ActorRef, editionReferenceResolver: EditionReferenceResolver, editionManager: EditionManager, localLibraryProvider: LocalLibraryProvider, - publishedLibraryCache: PublishedLibraryCache + publishedLibraryCache: PublishedLibraryCache, + installerConfig: LibraryInstallerConfig ) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryInstallerConfig.scala b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryInstallerConfig.scala new file mode 100644 index 000000000000..2b6f9be1cea1 --- /dev/null +++ b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/LibraryInstallerConfig.scala @@ -0,0 +1,17 @@ +package org.enso.languageserver.libraries + +import org.enso.distribution.{DistributionManager, LanguageHome} +import org.enso.distribution.locking.ResourceManager + +/** Gathers configuration needed by the library installer used in the + * `library/preinstall` endpoint. + * + * @param distributionManager the distribution manager + * @param resourceManager a resource manager instance + * @param languageHome language home, if detected / applicable + */ +case class LibraryInstallerConfig( + distributionManager: DistributionManager, + resourceManager: ResourceManager, + languageHome: Option[LanguageHome] +) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/handler/LibraryPreinstallHandler.scala b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/handler/LibraryPreinstallHandler.scala index 0dd2b4c8daeb..35e2901877ff 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/libraries/handler/LibraryPreinstallHandler.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/libraries/handler/LibraryPreinstallHandler.scala @@ -1,56 +1,183 @@ package org.enso.languageserver.libraries.handler -import akka.actor.{Actor, Props} +import akka.actor.{Actor, ActorRef, Props, Status} +import akka.pattern.pipe import com.typesafe.scalalogging.LazyLogging import org.enso.cli.task.notifications.ActorProgressNotificationForwarder -import org.enso.jsonrpc.{Request, ResponseError} +import org.enso.cli.task.{ProgressNotification, ProgressReporter} +import org.enso.distribution.ProgressAndLockNotificationForwarder +import org.enso.distribution.locking.LockUserInterface +import org.enso.editions.LibraryName +import org.enso.jsonrpc.{Id, Request, ResponseError, ResponseResult, Unused} import org.enso.languageserver.filemanager.FileManagerApi.FileSystemError -import org.enso.languageserver.libraries.FakeDownload import org.enso.languageserver.libraries.LibraryApi._ +import org.enso.languageserver.libraries.handler.LibraryPreinstallHandler.{ + InstallationResult, + InstallerError, + InternalError +} +import org.enso.languageserver.libraries.{ + EditionReference, + EditionReferenceResolver, + LibraryInstallerConfig +} import org.enso.languageserver.util.UnhandledLogging +import org.enso.librarymanager.ResolvingLibraryProvider.Error +import org.enso.librarymanager.{ + DefaultLibraryProvider, + ResolvedLibrary, + ResolvingLibraryProvider +} + +import java.util.concurrent.Executors +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try /** A request handler for the `library/preinstall` endpoint. * - * It is currently a stub implementation which will be refined later on. + * This request handler does not have any timeouts, because the download can + * take a very long time, highly depending on the library being downloaded + * (some libraries can be huge) and the network speed, so there is no good way + * to select a reasonable timeout. + * + * @param editionReferenceResolver an [[EditionReferenceResolver]] instance + * @param installerConfig configuration for the library installer */ -class LibraryPreinstallHandler - extends Actor +class LibraryPreinstallHandler( + editionReferenceResolver: EditionReferenceResolver, + installerConfig: LibraryInstallerConfig +) extends Actor with LazyLogging with UnhandledLogging { - override def receive: Receive = { - case Request(LibraryPreinstall, id, LibraryPreinstall.Params(_, name)) => - // TODO [RW] actual implementation - val progressReporter = - ActorProgressNotificationForwarder.translateAndForward( - LibraryPreinstall.name, - sender() - ) - - if (name == "Test") { - FakeDownload.simulateDownload( - "Download Test", - progressReporter, - seconds = 1 - ) - } else { - FakeDownload.simulateDownload( - "Downloading something...", - progressReporter - ) - FakeDownload.simulateDownload( - "Downloading something else...", - progressReporter - ) + + implicit private val ec: ExecutionContext = + ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) + + override def receive: Receive = requestStage + + private def requestStage: Receive = { + case Request( + LibraryPreinstall, + id, + LibraryPreinstall.Params(namespace, name) + ) => + val replyTo = sender() + val libraryName = LibraryName(namespace, name) + val notificationForwarder = new ProgressAndLockNotificationForwarder { + override def sendProgressNotification( + notification: ProgressNotification + ): Unit = + replyTo ! ActorProgressNotificationForwarder + .translateProgressNotification(LibraryPreinstall.name, notification) } - sender() ! ResponseError( - Some(id), - FileSystemError("Feature not implemented") - ) + + val installation: Future[InstallationResult] = Future { + val result = for { + libraryInstaller <- getLibraryProvider( + notificationForwarder + ).toEither.left.map(InternalError) + library <- libraryInstaller + .findLibrary(libraryName) + .left + .map(InstallerError) + } yield library + InstallationResult(result) + } + installation pipeTo self + + context.become(responseStage(id, replyTo, libraryName)) } + + private def responseStage( + requestId: Id, + replyTo: ActorRef, + libraryName: LibraryName + ): Receive = { + case InstallationResult(result) => + result match { + case Left(error) => + val errorMessage = error match { + case InternalError(throwable) => + FileSystemError(s"Internal error: ${throwable.getMessage}") + case InstallerError(Error.NotResolved(_)) => + LibraryNotResolved(libraryName) + case InstallerError(Error.RequestedLocalLibraryDoesNotExist) => + LocalLibraryNotFound(libraryName) + case InstallerError(Error.DownloadFailed(version, reason)) => + LibraryDownloadError(libraryName, version, reason.getMessage) + } + replyTo ! ResponseError( + Some(requestId), + errorMessage + ) + case Right(_) => + replyTo ! ResponseResult(LibraryPreinstall, requestId, Unused) + } + + context.stop(self) + + case Status.Failure(throwable) => + self ! Left(InternalError(throwable)) + } + + private def getLibraryProvider( + notificationReporter: ProgressReporter with LockUserInterface + ): Try[ResolvingLibraryProvider] = + for { + config <- editionReferenceResolver.getCurrentProjectConfig + edition <- editionReferenceResolver.resolveEdition( + EditionReference.CurrentProjectEdition + ) + } yield DefaultLibraryProvider.make( + distributionManager = installerConfig.distributionManager, + resourceManager = installerConfig.resourceManager, + lockUserInterface = notificationReporter, + progressReporter = notificationReporter, + languageHome = installerConfig.languageHome, + edition = edition, + preferLocalLibraries = config.preferLocalLibraries + ) } object LibraryPreinstallHandler { - /** Creates a configuration object to create [[LibraryPreinstallHandler]]. */ - def props(): Props = Props(new LibraryPreinstallHandler) + /** Creates a configuration object to create [[LibraryPreinstallHandler]]. + * + * @param editionReferenceResolver an [[EditionReferenceResolver]] instance + * @param installerConfig configuration for the library installer + */ + def props( + editionReferenceResolver: EditionReferenceResolver, + installerConfig: LibraryInstallerConfig + ): Props = Props( + new LibraryPreinstallHandler(editionReferenceResolver, installerConfig) + ) + + /** An internal message used to pass the installation result from the Future + * back to the Actor. + * + * It is used, because a pattern match directly on the [[Either]] would be + * unchecked due to type erasure. + */ + case class InstallationResult( + result: Either[InstallationError, ResolvedLibrary] + ) + + /** Indicates any error that happened during the installation. */ + sealed trait InstallationError + + /** Indicates an internal error which means that the installer could not even + * be instantiated. + * + * These may include things like not being able to load current project + * configuration to deduce the edition to use for resolving the requested + * library version. + */ + case class InternalError(throwable: Throwable) extends InstallationError + + /** Indicates a more casual error that has happened during the installation - + * for example that the library was not found or that the network connection + * could not be established. + */ + case class InstallerError(error: Error) extends InstallationError } diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala index 4eb8260b0c32..4cde27c762fd 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala @@ -513,7 +513,10 @@ class JsonConnectionController( .props(requestTimeout, libraryConfig.localLibraryManager), LibraryGetMetadata -> LibraryGetMetadataHandler .props(requestTimeout, libraryConfig.localLibraryManager), - LibraryPreinstall -> LibraryPreinstallHandler.props(), + LibraryPreinstall -> LibraryPreinstallHandler.props( + libraryConfig.editionReferenceResolver, + libraryConfig.installerConfig + ), LibraryPublish -> LibraryPublishHandler .props(requestTimeout, libraryConfig.localLibraryManager), LibrarySetMetadata -> LibrarySetMetadataHandler diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala b/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala index 17f0a3ac99ad..a25245ebba76 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/runtime/RuntimeConnector.scala @@ -1,17 +1,21 @@ package org.enso.languageserver.runtime -import java.nio.ByteBuffer - import akka.actor.{Actor, ActorRef, Props, Stash} import com.typesafe.scalalogging.LazyLogging +import org.enso.languageserver.runtime.RuntimeConnector.{ + Destroy, + MessageFromRuntime +} import org.enso.languageserver.util.UnhandledLogging -import org.enso.languageserver.runtime.RuntimeConnector.Destroy +import org.enso.lockmanager.server.LockManagerService import org.enso.polyglot.runtime.Runtime +import org.enso.polyglot.runtime.Runtime.{Api, ApiEnvelope} import org.graalvm.polyglot.io.MessageEndpoint -/** An actor managing a connection to Enso's runtime server. - */ -class RuntimeConnector +import java.nio.ByteBuffer + +/** An actor managing a connection to Enso's runtime server. */ +class RuntimeConnector(handlers: Map[Class[_], ActorRef]) extends Actor with LazyLogging with UnhandledLogging @@ -33,10 +37,26 @@ class RuntimeConnector } /** Performs communication between runtime and language server. - * Requests are sent from language server to runtime, - * responses are forwarded from runtime to the sender. * - * @param engine endpoint of a runtime + * Requests and responses can be sent in both directions and this Actor's + * message queue is both receiving messages from the runtime's message + * endpoint that it needs to forward to proper recipients as well as messages + * sent from other Actors to itself that it needs to forward to the runtime. + * + * Since both sides of the connection can both send requests and responses, + * the messages sent from the runtime are wrapped in [[MessageFromRuntime]], + * so that the message queue can distinguish them. + * + * Messages from other Actors to the runtime are serialized and sent to the + * [[MessageEndpoint]]. + * + * Messages from the runtime are handled depending on their type. Responses + * with a correlation id are sent to the Actor that sent the original request + * (based on a mapping kept in the state). Other responses (mostly + * notifications) are published to the system's event stream. Requests from + * the runtime are forwarded to one of the registered handlers. + * + * @param engine endpoint of a runtime * @param senders request ids with corresponding senders */ def initialized( @@ -44,15 +64,43 @@ class RuntimeConnector senders: Map[Runtime.Api.RequestId, ActorRef] ): Receive = { case Destroy => context.stop(self) - case msg: Runtime.Api.Request => + + case msg: Runtime.ApiEnvelope => engine.sendBinary(Runtime.Api.serialize(msg)) - msg.requestId.foreach { id => - context.become(initialized(engine, senders + (id -> sender()))) + + msg match { + case Api.Request(Some(id), _) => + context.become(initialized(engine, senders + (id -> sender()))) + case _ => + } + + case MessageFromRuntime(request @ Runtime.Api.Request(_, payload)) => + handlers.get(payload.getClass) match { + case Some(handler) => + handler ! request + case None => + logger.warn( + s"No registered handler found for request " + + s"[${payload.getClass.getCanonicalName}]." + ) } - case Runtime.Api.Response(None, msg: Runtime.ApiNotification) => + + case MessageFromRuntime(Runtime.Api.Response(None, msg)) => context.system.eventStream.publish(msg) - case msg @ Runtime.Api.Response(Some(correlationId), _) => - senders.get(correlationId).foreach(_ ! msg) + + case MessageFromRuntime( + msg @ Runtime.Api.Response(Some(correlationId), payload) + ) => + senders.get(correlationId) match { + case Some(sender) => + sender ! msg + case None => + logger.warn( + s"No sender has been found associated with request id " + + s"[$correlationId], the response " + + s"[${payload.getClass.getCanonicalName}] will be dropped." + ) + } context.become(initialized(engine, senders - correlationId)) } } @@ -71,14 +119,19 @@ object RuntimeConnector { /** Helper for creating instances of the [[RuntimeConnector]] actor. * + * @param lockManagerService a reference to the lock manager service actor * @return a [[Props]] instance for the newly created actor. */ - def props: Props = - Props(new RuntimeConnector) + def props(lockManagerService: ActorRef): Props = { + val lockRequests = + LockManagerService.handledRequestTypes.map(_ -> lockManagerService) + val handlers: Map[Class[_], ActorRef] = Map.from(lockRequests) + Props(new RuntimeConnector(handlers)) + } /** Endpoint implementation used to handle connections with the runtime. * - * @param actor the actor ref to pass received messages to. + * @param actor the actor ref to pass received messages to. * @param peerEndpoint the runtime server's connection end. */ class Endpoint(actor: ActorRef, peerEndpoint: MessageEndpoint) @@ -87,8 +140,8 @@ object RuntimeConnector { override def sendBinary(data: ByteBuffer): Unit = Runtime.Api - .deserializeResponse(data) - .foreach(actor ! _) + .deserializeApiEnvelope(data) + .foreach(actor ! MessageFromRuntime(_)) override def sendPing(data: ByteBuffer): Unit = peerEndpoint.sendPong(data) @@ -96,4 +149,9 @@ object RuntimeConnector { override def sendClose(): Unit = actor ! RuntimeConnector.Destroy } + + /** Wraps messages received from the runtime, to distinguish them from + * messages received from other Actors. + */ + case class MessageFromRuntime(message: ApiEnvelope) } diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala index cc464058d680..9fc7e23e6f17 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala @@ -5,9 +5,10 @@ import io.circe.literal._ import io.circe.parser.parse import io.circe.syntax.EncoderOps import org.apache.commons.io.FileUtils +import org.enso.distribution.locking.ResourceManager import org.enso.distribution.{DistributionManager, LanguageHome} -import org.enso.editions.{EditionResolver, Editions} import org.enso.editions.updater.EditionManager +import org.enso.editions.{EditionResolver, Editions} import org.enso.jsonrpc.test.JsonRpcServerTestKit import org.enso.jsonrpc.{ClientControllerFactory, Protocol} import org.enso.languageserver.TestClock @@ -22,12 +23,7 @@ import org.enso.languageserver.effect.ZioExec import org.enso.languageserver.event.InitializedEvent import org.enso.languageserver.filemanager._ import org.enso.languageserver.io._ -import org.enso.languageserver.libraries.{ - EditionReferenceResolver, - LibraryConfig, - LocalLibraryManager, - ProjectSettingsManager -} +import org.enso.languageserver.libraries._ import org.enso.languageserver.monitoring.IdlenessMonitor import org.enso.languageserver.protocol.json.{ JsonConnectionControllerFactory, @@ -44,13 +40,15 @@ import org.enso.librarymanager.published.PublishedLibraryCache import org.enso.pkg.PackageManager import org.enso.polyglot.data.TypeGraph import org.enso.polyglot.runtime.Runtime.Api -import org.enso.runtimeversionmanager.test.FakeEnvironment +import org.enso.runtimeversionmanager.test.{ + FakeEnvironment, + TestableThreadSafeFileLockManager +} import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo, SqlVersionsRepo} -import org.enso.testkit.{EitherValue, HasTestDirectory} +import org.enso.testkit.{EitherValue, WithTemporaryDirectory} import org.enso.text.Sha3_224VersionCalculator import org.scalatest.OptionValues -import java.nio.file import java.nio.file.{Files, Path} import java.util.UUID import scala.concurrent.Await @@ -60,7 +58,7 @@ class BaseServerTest extends JsonRpcServerTestKit with EitherValue with OptionValues - with HasTestDirectory + with WithTemporaryDirectory with FakeEnvironment { import system.dispatcher @@ -84,12 +82,7 @@ class BaseServerTest graph } - private val testDirectory = - Files.createTempDirectory("enso-test").toRealPath() - override def getTestDirectory: file.Path = testDirectory - sys.addShutdownHook(FileUtils.deleteQuietly(testContentRoot.file)) - sys.addShutdownHook(FileUtils.deleteQuietly(testDirectory.toFile)) def mkConfig: Config = Config( @@ -145,6 +138,14 @@ class BaseServerTest val contentRootManagerActor = system.actorOf(ContentRootManagerActor.props(config)) + var cleanupCallbacks: List[() => Unit] = Nil + + override def afterEach(): Unit = { + cleanupCallbacks.foreach(_()) + cleanupCallbacks = Nil + super.afterEach() + } + override def clientControllerFactory: ClientControllerFactory = { val contentRootManagerWrapper: ContentRootManager = new ContentRootManagerWrapper(config, contentRootManagerActor) @@ -228,6 +229,13 @@ class BaseServerTest val environment = fakeInstalledEnvironment() val languageHome = LanguageHome.detectFromExecutableLocation(environment) val distributionManager = new DistributionManager(environment) + val lockManager: TestableThreadSafeFileLockManager = + new TestableThreadSafeFileLockManager(distributionManager.paths.locks) + + // This is needed to be able to safely remove the temporary test directory on Windows. + cleanupCallbacks ::= { () => lockManager.releaseAllLocks() } + + val resourceManager = new ResourceManager(lockManager) val editionProvider = EditionManager.makeEditionProvider( @@ -265,7 +273,12 @@ class BaseServerTest editionManager = editionManager, localLibraryProvider = DefaultLocalLibraryProvider.make(libraryLocations), publishedLibraryCache = - PublishedLibraryCache.makeReadOnlyCache(libraryLocations) + PublishedLibraryCache.makeReadOnlyCache(libraryLocations), + installerConfig = LibraryInstallerConfig( + distributionManager, + resourceManager, + Some(languageHome) + ) ) new JsonConnectionControllerFactory( diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/LibrariesTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/LibrariesTest.scala index e3d24219d602..1849cc8ef967 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/LibrariesTest.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/LibrariesTest.scala @@ -6,6 +6,7 @@ import nl.gn0s1s.bump.SemVer import org.enso.editions.{Editions, LibraryName} import org.enso.languageserver.libraries.LibraryEntry import org.enso.languageserver.libraries.LibraryEntry.PublishedLibraryVersion +import org.enso.librarymanager.published.bundles.LocalReadOnlyRepository import org.enso.librarymanager.published.repository.{ EmptyRepository, ExampleRepository @@ -15,18 +16,14 @@ import org.enso.pkg.{Contact, PackageManager} import java.nio.file.Files class LibrariesTest extends BaseServerTest { + private val libraryRepositoryPort: Int = 47308 + + private val exampleRepo = new ExampleRepository + private val baseUrl = s"http://localhost:$libraryRepositoryPort/" + private val repositoryUrl = baseUrl + "libraries" + override protected def customEdition: Option[Editions.RawEdition] = Some( - Editions.Raw.Edition - .make( - parent = Some(buildinfo.Info.currentEdition), - libraries = Seq( - Editions.Raw.PublishedLibrary( - name = LibraryName("Foo", "Bar"), - version = SemVer(1, 2, 3), - repository = "main" - ) - ) - ) + exampleRepo.createEdition(repositoryUrl) ) "LocalLibraryManager" should { @@ -201,8 +198,6 @@ class LibrariesTest extends BaseServerTest { """) } - def port: Int = 47308 - "create, publish a library and fetch its manifest from the server" in { val client = getInitialisedWsClient() client.send(json""" @@ -256,10 +251,12 @@ class LibrariesTest extends BaseServerTest { } """) - val baseUrl = s"http://localhost:$port/" - val repositoryUrl = baseUrl + "libraries" - val repoRoot = getTestDirectory.resolve("libraries_repo_root") - EmptyRepository.withServer(port, repoRoot, uploads = true) { + val repoRoot = getTestDirectory.resolve("libraries_repo_root") + EmptyRepository.withServer( + libraryRepositoryPort, + repoRoot, + uploads = true + ) { val uploadUrl = baseUrl + "upload" val uploadRequestId = 2 client.send(json""" @@ -360,49 +357,85 @@ class LibrariesTest extends BaseServerTest { } } - "mocked library/preinstall" should { - "send progress notifications" in { + "library/preinstall" should { + "download the library sending progress notifications " + + "and correctly place it in cache" in { val client = getInitialisedWsClient() - client.send(json""" + + val repositoryPath = getTestDirectory.resolve("repository_path") + exampleRepo.createRepository(repositoryPath) + exampleRepo.withServer(libraryRepositoryPort, repositoryPath) { + val requestId = 0 + client.send(json""" { "jsonrpc": "2.0", "method": "library/preinstall", - "id": 0, + "id": $requestId, "params": { "namespace": "Foo", - "name": "Test" + "name": "Bar" } } """) - val messages = - for (_ <- 0 to 3) yield { - val msg = client.expectSomeJson().asObject.value - val method = msg("method").map(_.asString.value).getOrElse("error") - val params = - msg("params").map(_.asObject.value).getOrElse(JsonObject()) - (method, params) + + val messages = collection.mutable.ListBuffer[(String, JsonObject)]() + var waitingForTask = true + var waitingForResult = true + + while (waitingForTask || waitingForResult) { + val msg = client.expectSomeJson().asObject.value + + msg("id") match { + case Some(json) => + json.asNumber.value.toInt.value shouldEqual requestId + msg("result").value.asNull.value + waitingForResult = false + case None => + val method = + msg("method").map(_.asString.value).getOrElse("error") + val params = + msg("params").map(_.asObject.value).getOrElse(JsonObject()) + messages.addOne((method, params)) + + if (method == "task/finished") waitingForTask = false + } } - val taskStart = messages.find(_._1 == "task/started").value - val taskId = taskStart._2("taskId").value.asString.value - taskStart - ._2("relatedOperation") - .value - .asString - .value shouldEqual "library/preinstall" + val taskStart = messages.find(_._1 == "task/started").value + val taskId = taskStart._2("taskId").value.asString.value + taskStart + ._2("relatedOperation") + .value + .asString + .value shouldEqual "library/preinstall" - taskStart._2("unit").value.asString.value shouldEqual "Bytes" + taskStart._2("unit").value.asString.value shouldEqual "Bytes" - val updates = messages.filter { case (method, params) => - method == "task/progress-update" && - params("taskId").value.asString.value == taskId - } + val updates = messages.filter { case (method, params) => + method == "task/progress-update" && + params("taskId").value.asString.value == taskId + } + + updates should not be empty + updates.head._2("message").value.asString.value should include( + "Downloading" + ) - updates should not be empty - updates.head - ._2("message") - .value - .asString - .value shouldEqual "Download Test" + val cachePath = getTestDirectory.resolve("test_data").resolve("lib") + val readOnlyCache = new LocalReadOnlyRepository(cachePath) + val cachedLibraryRoot = readOnlyCache + .findCachedLibrary( + LibraryName("Foo", "Bar"), + SemVer(1, 0, 0) + ) + .value + + val pkg = + PackageManager.Default.loadPackage(cachedLibraryRoot.toFile).get + pkg.name shouldEqual "Bar" + pkg.listSources.map( + _.file.getName + ) should contain theSameElementsAs Seq("Main.enso") + } } } diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProjectSettingsManagerTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProjectSettingsManagerTest.scala index 61ffcbbcee2f..fdabbb17f35e 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProjectSettingsManagerTest.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/ProjectSettingsManagerTest.scala @@ -6,8 +6,8 @@ import org.enso.distribution.FileSystem import java.nio.file.Files class ProjectSettingsManagerTest extends BaseServerTest { - override def beforeAll(): Unit = { - super.beforeAll() + override def beforeEach(): Unit = { + super.beforeEach() val editionsDir = getTestDirectory.resolve("test_data").resolve("editions") Files.createDirectories(editionsDir) diff --git a/engine/launcher/src/main/scala/org/enso/launcher/distribution/DefaultManagers.scala b/engine/launcher/src/main/scala/org/enso/launcher/distribution/DefaultManagers.scala index c9516d414bc0..1c4ca678eabb 100644 --- a/engine/launcher/src/main/scala/org/enso/launcher/distribution/DefaultManagers.scala +++ b/engine/launcher/src/main/scala/org/enso/launcher/distribution/DefaultManagers.scala @@ -1,24 +1,15 @@ package org.enso.launcher.distribution +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.distribution.{ PortableDistributionManager, TemporaryDirectoryManager } -import org.enso.distribution.locking.{ - ResourceManager, - ThreadSafeFileLockManager -} import org.enso.launcher.cli.{ CLIRuntimeVersionManagementUserInterface, GlobalCLIOptions } -import org.enso.runtimeversionmanager.components.{ - GraalVMComponentConfiguration, - InstallerKind, - RuntimeComponentConfiguration, - RuntimeComponentUpdaterFactory, - RuntimeVersionManager -} +import org.enso.runtimeversionmanager.components._ import org.enso.runtimeversionmanager.releases.engine.EngineRepository import org.enso.runtimeversionmanager.releases.graalvm.GraalCEReleaseProvider @@ -39,8 +30,10 @@ object DefaultManagers { lazy val defaultFileLockManager = new ThreadSafeFileLockManager(distributionManager.paths.locks) - /** Default [[ResourceManager]] using the [[defaultFileLockManager]]. */ - lazy val defaultResourceManager = new ResourceManager(defaultFileLockManager) + /** Default [[LauncherResourceManager]] using the [[defaultFileLockManager]]. */ + lazy val defaultResourceManager = new LauncherResourceManager( + defaultFileLockManager + ) /** Default [[TemporaryDirectoryManager]]. */ lazy val temporaryDirectoryManager = diff --git a/engine/launcher/src/main/scala/org/enso/launcher/distribution/LauncherResourceManager.scala b/engine/launcher/src/main/scala/org/enso/launcher/distribution/LauncherResourceManager.scala new file mode 100644 index 000000000000..f1adf7134e00 --- /dev/null +++ b/engine/launcher/src/main/scala/org/enso/launcher/distribution/LauncherResourceManager.scala @@ -0,0 +1,82 @@ +package org.enso.launcher.distribution + +import org.enso.distribution.locking._ + +/** Adds additional capabilities to the [[ResourceManager]], focused on + * synchronizing launcher instances. + */ +class LauncherResourceManager(lockManager: LockManager) + extends ResourceManager(lockManager) { + private var mainLock: Option[Lock] = None + + /** Initializes the [[MainLock]]. */ + def initializeMainLock(): Unit = { + val lock = + lockManager + .tryAcquireLock(MainLock.name, LockType.Shared) + .getOrElse { + throw DistributionIsModifiedError(MainLock.waitMessage) + } + mainLock = Some(lock) + } + + /** Exception that is thrown when the main lock is held exclusively. + * + * This situation means that the current distribution is being installed or + * uninstalled, so it should not be used in the meantime and the application + * has to terminate immediately. + */ + case class DistributionIsModifiedError(message: String) + extends RuntimeException(message) + + /** Acquires an exclusive main lock (first releasing the shared lock), + * ensuring that no other processes using this distribution can be running in + * parallel. + * + * @param waitAction function that is executed if the lock cannot be acquired + * immediately + */ + def acquireExclusiveMainLock(waitAction: () => Unit): Unit = { + mainLock match { + case Some(oldLock) => + oldLock.release() + mainLock = None + case None => + } + + val lock = lockManager.acquireLockWithWaitingAction( + MainLock.name, + LockType.Exclusive, + waitAction + ) + mainLock = Some(lock) + } + + /** Releases the main lock. + * + * Should be called just before the program terminates. It is not an error to + * skip it, as the operating system should unlock all resources after the + * program terminates, but on some platforms this automatic 'garbage + * collection for locks' may take some time, so it is better to release it + * manually. + */ + def releaseMainLock(): Unit = + mainLock match { + case Some(lock) => + lock.release() + mainLock = None + case None => + } + + /** The main lock that is held by all launcher processes. + * + * It is used to ensure that no other processes are running when the + * distribution is being installed or uninstalled. + */ + private case object MainLock extends Resource { + override def name: String = "launcher-main" + override def waitMessage: String = + "Another process is installing or uninstalling the current " + + "distribution. Please wait until that finishes." + } +} diff --git a/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionInstaller.scala b/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionInstaller.scala index 05cae7660994..e94b8109d15e 100644 --- a/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionInstaller.scala +++ b/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionInstaller.scala @@ -4,7 +4,6 @@ import com.typesafe.scalalogging.Logger import org.enso.cli.{CLIOutput, OS} import org.enso.distribution.FileSystem.PathSyntax import org.enso.distribution.config.GlobalConfigurationManager -import org.enso.distribution.locking.ResourceManager import org.enso.distribution.{ DistributionManager, FileSystem, @@ -12,7 +11,7 @@ import org.enso.distribution.{ } import org.enso.launcher.InfoLogger import org.enso.launcher.cli.{GlobalCLIOptions, InternalOpts, Main} -import org.enso.launcher.distribution.DefaultManagers +import org.enso.launcher.distribution.{DefaultManagers, LauncherResourceManager} import org.enso.launcher.installation.DistributionInstaller.{ BundleAction, IgnoreBundles, @@ -38,7 +37,7 @@ import scala.util.control.NonFatal */ class DistributionInstaller( manager: PortableDistributionManager, - resourceManager: ResourceManager, + resourceManager: LauncherResourceManager, autoConfirm: Boolean, removeOldLauncher: Boolean, bundleActionOption: Option[DistributionInstaller.BundleAction] diff --git a/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionUninstaller.scala b/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionUninstaller.scala index dadfcb45b5ab..cf50e9877626 100644 --- a/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionUninstaller.scala +++ b/engine/launcher/src/main/scala/org/enso/launcher/installation/DistributionUninstaller.scala @@ -5,7 +5,6 @@ import org.apache.commons.io.FileUtils import org.enso.cli.{CLIOutput, OS} import org.enso.distribution.FileSystem.PathSyntax import org.enso.distribution.config.GlobalConfigurationManager -import org.enso.distribution.locking.ResourceManager import org.enso.distribution.{ DistributionManager, FileSystem, @@ -18,7 +17,7 @@ import org.enso.launcher.cli.{ LauncherLogging, Main } -import org.enso.launcher.distribution.DefaultManagers +import org.enso.launcher.distribution.{DefaultManagers, LauncherResourceManager} import java.nio.file.{Files, Path} import scala.util.control.NonFatal @@ -30,7 +29,7 @@ import scala.util.control.NonFatal */ class DistributionUninstaller( manager: PortableDistributionManager, - resourceManager: ResourceManager, + resourceManager: LauncherResourceManager, globalCLIOptions: GlobalCLIOptions ) { private val autoConfirm = globalCLIOptions.autoConfirm diff --git a/engine/launcher/src/test/scala/org/enso/launcher/LauncherConcurrencyTest.scala b/engine/launcher/src/test/scala/org/enso/launcher/LauncherConcurrencyTest.scala new file mode 100644 index 000000000000..18165eb4859d --- /dev/null +++ b/engine/launcher/src/test/scala/org/enso/launcher/LauncherConcurrencyTest.scala @@ -0,0 +1,102 @@ +package org.enso.launcher + +import org.enso.launcher.distribution.LauncherResourceManager +import org.enso.runtimeversionmanager.test.{ + FakeEnvironment, + SlowTestSynchronizer, + TestLocalLockManager +} +import org.enso.testkit.{FlakySpec, RetrySpec, WithTemporaryDirectory} +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.TimeLimitedTests +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import org.scalatest.wordspec.AnyWordSpec + +class LauncherConcurrencyTest + extends AnyWordSpec + with Matchers + with WithTemporaryDirectory + with FakeEnvironment + with BeforeAndAfterEach + with TimeLimitedTests + with RetrySpec + with FlakySpec { + + /** This is an upper bound to avoid stalling the tests forever, but particular + * operations have smaller timeouts usually. + */ + val timeLimit: Span = 240.seconds + + "synchronize main lock" taggedAs Retry in { + + /** First two threads start and acquire the shared lock, than the third + * thread tries to acquire an exclusive lock (in practice that will be our + * (un)installer), it should wait for the other threads to finish. When + * the threads see that it started waiting (the waiting notification is + * normally used to tell the user what the application is waiting for), + * the two threads finish and after that the third one is able to acquire + * the exclusive lock. + */ + val sync = new SlowTestSynchronizer + val lockManager = new TestLocalLockManager + def makeNewResourceManager(): LauncherResourceManager = + new LauncherResourceManager(lockManager) + + sync.startThread("t1") { + val resourceManager = makeNewResourceManager() + resourceManager.initializeMainLock() + sync.report("shared-start") + sync.signal("started-1") + sync.waitFor("finish-1") + sync.report("shared-end") + resourceManager.releaseMainLock() + } + + sync.startThread("t2") { + val resourceManager = makeNewResourceManager() + resourceManager.initializeMainLock() + sync.report("shared-start") + sync.signal("started-2") + sync.waitFor("finish-2") + sync.report("shared-end") + resourceManager.releaseMainLock() + } + + sync.waitFor("started-1") + sync.waitFor("started-2") + + sync.startThread("t3") { + val resourceManager = makeNewResourceManager() + resourceManager.initializeMainLock() + sync.report("t3-start") + resourceManager.acquireExclusiveMainLock(() => { + sync.report("t3-wait") + sync.signal("waiting") + }) + sync.report("t3-end") + sync.signal("finish-all") + resourceManager.releaseMainLock() + } + + sync.waitFor("waiting") + Thread.sleep(1000) + + sync.signal("finish-1") + sync.signal("finish-2") + + sync.waitFor("finish-all") + + sync.join() + sync.summarizeReports() shouldEqual Seq( + "shared-start", + "shared-start", + "t3-start", + "t3-wait", + "shared-end", + "shared-end", + "t3-end" + ) + } +} diff --git a/engine/polyglot-api/src/main/scala/org/enso/polyglot/runtime/Runtime.scala b/engine/polyglot-api/src/main/scala/org/enso/polyglot/runtime/Runtime.scala index d4005bf4cf55..8d6ca634c37d 100644 --- a/engine/polyglot-api/src/main/scala/org/enso/polyglot/runtime/Runtime.scala +++ b/engine/polyglot-api/src/main/scala/org/enso/polyglot/runtime/Runtime.scala @@ -26,6 +26,14 @@ object Runtime { @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes( Array( + new JsonSubTypes.Type( + value = classOf[Api.Request], + name = "request" + ), + new JsonSubTypes.Type( + value = classOf[Api.Response], + name = "response" + ), new JsonSubTypes.Type( value = classOf[Api.CreateContextRequest], name = "createContextRequest" @@ -213,6 +221,34 @@ object Runtime { new JsonSubTypes.Type( value = classOf[Api.ProgressNotification], name = "progressNotification" + ), + new JsonSubTypes.Type( + value = classOf[Api.AcquireLockRequest], + name = "acquireLockRequest" + ), + new JsonSubTypes.Type( + value = classOf[Api.ReleaseLockRequest], + name = "releaseLockRequest" + ), + new JsonSubTypes.Type( + value = classOf[Api.LockAcquired], + name = "lockAcquired" + ), + new JsonSubTypes.Type( + value = classOf[Api.CannotAcquireImmediately], + name = "cannotAcquireImmediately" + ), + new JsonSubTypes.Type( + value = classOf[Api.LockAcquireFailed], + name = "lockAcquireFailed" + ), + new JsonSubTypes.Type( + value = classOf[Api.LockReleased], + name = "lockReleased" + ), + new JsonSubTypes.Type( + value = classOf[Api.LockReleaseFailed], + name = "lockReleaseFailed" ) ) ) @@ -413,7 +449,7 @@ object Runtime { * @param contextId the context's id. * @param updates a list of updates. */ - case class ExpressionUpdates( + final case class ExpressionUpdates( contextId: ContextId, updates: Set[ExpressionUpdate] ) extends ApiNotification @@ -866,7 +902,7 @@ object Runtime { * @param contextId the context's id * @param diagnostics the list of diagnostic messages */ - case class ExecutionUpdate( + final case class ExecutionUpdate( contextId: ContextId, diagnostics: Seq[ExecutionResult.Diagnostic] ) extends ApiNotification @@ -885,7 +921,7 @@ object Runtime { * @param contextId the context's id * @param failure the error description */ - case class ExecutionFailed( + final case class ExecutionFailed( contextId: ContextId, failure: ExecutionResult.Failure ) extends ApiNotification @@ -904,7 +940,7 @@ object Runtime { * @param visualisationContext a visualisation context * @param data a visualisation data */ - case class VisualisationUpdate( + final case class VisualisationUpdate( visualisationContext: VisualisationContext, data: Array[Byte] ) extends ApiNotification @@ -923,7 +959,7 @@ object Runtime { * @param requestId the request identifier. * @param payload the request payload. */ - case class Request(requestId: Option[RequestId], payload: ApiRequest) + final case class Request(requestId: Option[RequestId], payload: ApiRequest) extends ApiEnvelope object Request { @@ -951,8 +987,10 @@ object Runtime { * @param correlationId request that initiated the response * @param payload response */ - case class Response(correlationId: Option[RequestId], payload: ApiResponse) - extends ApiEnvelope + final case class Response( + correlationId: Option[RequestId], + payload: ApiResponse + ) extends ApiEnvelope object Response { @@ -979,27 +1017,31 @@ object Runtime { * * @param contextId the newly created context's id. */ - case class CreateContextRequest(contextId: ContextId) extends ApiRequest + final case class CreateContextRequest(contextId: ContextId) + extends ApiRequest /** A response sent from the server upon handling the [[CreateContextRequest]] * * @param contextId the newly created context's id. */ - case class CreateContextResponse(contextId: ContextId) extends ApiResponse + final case class CreateContextResponse(contextId: ContextId) + extends ApiResponse /** A Request sent from the client to the runtime server, to destroy an * execution context with a given id. * * @param contextId the destroyed context's id. */ - case class DestroyContextRequest(contextId: ContextId) extends ApiRequest + final case class DestroyContextRequest(contextId: ContextId) + extends ApiRequest /** A success response sent from the server upon handling the * [[DestroyContextRequest]] * * @param contextId the destroyed context's id */ - case class DestroyContextResponse(contextId: ContextId) extends ApiResponse + final case class DestroyContextResponse(contextId: ContextId) + extends ApiResponse /** A Request sent from the client to the runtime server, to move * the execution context to a new location deeper down the stack. @@ -1007,27 +1049,31 @@ object Runtime { * @param contextId the context's id. * @param stackItem an item that should be pushed on the stack. */ - case class PushContextRequest(contextId: ContextId, stackItem: StackItem) - extends ApiRequest + final case class PushContextRequest( + contextId: ContextId, + stackItem: StackItem + ) extends ApiRequest /** A response sent from the server upon handling the [[PushContextRequest]] * * @param contextId the context's id. */ - case class PushContextResponse(contextId: ContextId) extends ApiResponse + final case class PushContextResponse(contextId: ContextId) + extends ApiResponse /** A Request sent from the client to the runtime server, to move * the execution context up the stack. * * @param contextId the context's id. */ - case class PopContextRequest(contextId: ContextId) extends ApiRequest + final case class PopContextRequest(contextId: ContextId) extends ApiRequest /** A response sent from the server upon handling the [[PopContextRequest]] * * @param contextId the context's id. */ - case class PopContextResponse(contextId: ContextId) extends ApiResponse + final case class PopContextResponse(contextId: ContextId) + extends ApiResponse /** A Request sent from the client to the runtime server, to recompute * the execution context. @@ -1036,7 +1082,7 @@ object Runtime { * @param expressions the selector specifying which expressions should be * recomputed. */ - case class RecomputeContextRequest( + final case class RecomputeContextRequest( contextId: ContextId, expressions: Option[InvalidatedExpressions] ) extends ApiRequest @@ -1046,26 +1092,27 @@ object Runtime { * * @param contextId the context's id. */ - case class RecomputeContextResponse(contextId: ContextId) + final case class RecomputeContextResponse(contextId: ContextId) extends ApiResponse /** An error response signifying a non-existent context. * * @param contextId the context's id */ - case class ContextNotExistError(contextId: ContextId) extends Error + final case class ContextNotExistError(contextId: ContextId) extends Error /** Signals that a module cannot be found. * * @param moduleName the module name */ - case class ModuleNotFound(moduleName: String) extends Error + final case class ModuleNotFound(moduleName: String) extends Error /** Signals that execution of a context completed. * * @param contextId the context's id */ - case class ExecutionComplete(contextId: ContextId) extends ApiNotification + final case class ExecutionComplete(contextId: ContextId) + extends ApiNotification /** Signals that an expression specified in a [[AttachVisualisation]] or * a [[ModifyVisualisation]] cannot be evaluated. @@ -1073,7 +1120,7 @@ object Runtime { * @param message the reason of the failure * @param failure the detailed information about the failure */ - case class VisualisationExpressionFailed( + final case class VisualisationExpressionFailed( message: String, failure: Option[ExecutionResult.Diagnostic] ) extends Error @@ -1096,7 +1143,7 @@ object Runtime { * @param message the reason of the failure * @param diagnostic the detailed information about the failure */ - case class VisualisationEvaluationFailed( + final case class VisualisationEvaluationFailed( contextId: ContextId, visualisationId: VisualisationId, expressionId: ExpressionId, @@ -1117,19 +1164,19 @@ object Runtime { } /** Signals that visualisation cannot be found. */ - case class VisualisationNotFound() extends Error + final case class VisualisationNotFound() extends Error /** An error response signifying that stack is empty. * * @param contextId the context's id */ - case class EmptyStackError(contextId: ContextId) extends Error + final case class EmptyStackError(contextId: ContextId) extends Error /** An error response signifying that stack item is invalid. * * @param contextId the context's id */ - case class InvalidStackItemError(contextId: ContextId) extends Error + final case class InvalidStackItemError(contextId: ContextId) extends Error /** A notification sent to the server about switching a file to literal * contents. @@ -1137,7 +1184,7 @@ object Runtime { * @param path the file being moved to memory. * @param contents the current file contents. */ - case class OpenFileNotification( + final case class OpenFileNotification( path: File, contents: String ) extends ApiRequest @@ -1157,7 +1204,7 @@ object Runtime { * @param path the file being edited. * @param edits the diffs to apply to the contents. */ - case class EditFileNotification(path: File, edits: Seq[TextEdit]) + final case class EditFileNotification(path: File, edits: Seq[TextEdit]) extends ApiRequest with ToLogString { @@ -1174,7 +1221,7 @@ object Runtime { * * @param path the file being closed. */ - case class CloseFileNotification(path: File) + final case class CloseFileNotification(path: File) extends ApiRequest with ToLogString { @@ -1187,7 +1234,7 @@ object Runtime { * initialization. Any messages sent to the server before receiving this * message will be dropped. */ - case class InitializedNotification() extends ApiResponse + final case class InitializedNotification() extends ApiResponse /** A request sent from the client to the runtime server, to create a new * visualisation for an expression identified by `expressionId`. @@ -1197,7 +1244,7 @@ object Runtime { * @param visualisationConfig a configuration object for properties of the * visualisation */ - case class AttachVisualisation( + final case class AttachVisualisation( visualisationId: VisualisationId, expressionId: ExpressionId, visualisationConfig: VisualisationConfiguration @@ -1215,7 +1262,7 @@ object Runtime { /** Signals that attaching a visualisation has succeeded. */ - case class VisualisationAttached() extends ApiResponse + final case class VisualisationAttached() extends ApiResponse /** A request sent from the client to the runtime server, to detach a * visualisation from an expression identified by `expressionId`. @@ -1224,7 +1271,7 @@ object Runtime { * @param visualisationId an identifier of a visualisation * @param expressionId an identifier of an expression which is visualised */ - case class DetachVisualisation( + final case class DetachVisualisation( contextId: ContextId, visualisationId: VisualisationId, expressionId: ExpressionId @@ -1232,7 +1279,7 @@ object Runtime { /** Signals that detaching a visualisation has succeeded. */ - case class VisualisationDetached() extends ApiResponse + final case class VisualisationDetached() extends ApiResponse /** A request sent from the client to the runtime server, to modify a * visualisation identified by `visualisationId`. @@ -1241,7 +1288,7 @@ object Runtime { * @param visualisationConfig a configuration object for properties of the * visualisation */ - case class ModifyVisualisation( + final case class ModifyVisualisation( visualisationId: VisualisationId, visualisationConfig: VisualisationConfiguration ) extends ToLogString @@ -1257,15 +1304,15 @@ object Runtime { /** Signals that a visualisation modification has succeeded. */ - case class VisualisationModified() extends ApiResponse + final case class VisualisationModified() extends ApiResponse /** A request to shut down the runtime server. */ - case class ShutDownRuntimeServer() extends ApiRequest + final case class ShutDownRuntimeServer() extends ApiRequest /** Signals that the runtime server has been shut down. */ - case class RuntimeServerShutDown() extends ApiResponse + final case class RuntimeServerShutDown() extends ApiResponse /** A request for project renaming. * @@ -1273,7 +1320,7 @@ object Runtime { * @param oldName the old project name * @param newName the new project name */ - case class RenameProject( + final case class RenameProject( namespace: String, oldName: String, newName: String @@ -1284,7 +1331,7 @@ object Runtime { * @param namespace the namespace of the project * @param newName the new project name */ - case class ProjectRenamed(namespace: String, newName: String) + final case class ProjectRenamed(namespace: String, newName: String) extends ApiResponse /** A notification about the changes in the suggestions database. @@ -1295,7 +1342,7 @@ object Runtime { * @param exports the list of re-exported symbols * @param updates the list of suggestions extracted from module */ - case class SuggestionsDatabaseModuleUpdateNotification( + final case class SuggestionsDatabaseModuleUpdateNotification( module: String, version: ContentVersion, actions: Vector[SuggestionsDatabaseAction], @@ -1316,30 +1363,30 @@ object Runtime { } /** A request to invalidate the indexed flag of the modules. */ - case class InvalidateModulesIndexRequest() extends ApiRequest + final case class InvalidateModulesIndexRequest() extends ApiRequest /** Signals that the module indexes has been invalidated. */ - case class InvalidateModulesIndexResponse() extends ApiResponse + final case class InvalidateModulesIndexResponse() extends ApiResponse /** A request to verify the modules in the suggestions database. * * @param modules the list of modules */ - case class VerifyModulesIndexRequest(modules: Seq[String]) + final case class VerifyModulesIndexRequest(modules: Seq[String]) extends ApiRequest /** A response to the module verification request. * * @param remove the list of modules to remove from suggestions database. */ - case class VerifyModulesIndexResponse(remove: Seq[String]) + final case class VerifyModulesIndexResponse(remove: Seq[String]) extends ApiResponse /** A request to return info needed to import the suggestion. * * @param suggestion the suggestion to import */ - case class ImportSuggestionRequest(suggestion: Suggestion) + final case class ImportSuggestionRequest(suggestion: Suggestion) extends ApiRequest with ToLogString { @@ -1354,20 +1401,20 @@ object Runtime { * @param symbol the resolved symbol * @param exports the list of exports of the symbol */ - case class ImportSuggestionResponse( + final case class ImportSuggestionResponse( module: String, symbol: String, exports: Seq[Export] ) extends ApiResponse /** A request for the type hierarchy graph. */ - case class GetTypeGraphRequest() extends ApiRequest + final case class GetTypeGraphRequest() extends ApiRequest /** The result of the type graph request. * * @param graph the graph. */ - case class GetTypeGraphResponse(graph: TypeGraph) extends ApiResponse + final case class GetTypeGraphResponse(graph: TypeGraph) extends ApiResponse /** Signals that a new library has been imported, which means its content * root should be registered. @@ -1378,7 +1425,7 @@ object Runtime { * @param location location on disk of the project root belonging to the * loaded library */ - case class LibraryLoaded( + final case class LibraryLoaded( namespace: String, name: String, version: String, @@ -1389,7 +1436,7 @@ object Runtime { * * @param payload the actual update contained within this notification */ - case class ProgressNotification( + final case class ProgressNotification( payload: ProgressNotification.NotificationType ) extends ApiNotification @@ -1419,43 +1466,82 @@ object Runtime { ) extends NotificationType } + /** A request sent from the runtime to acquire a lock. + * + * @param resourceName name of the resource identifying the lock + * @param exclusive whether the lock should be exclusive (if false, a + * shared lock is acquired, if supported) + * @param returnImmediately if set to true, will immediately return even if + * the lock cannot be acquired; if set to false, + * the response to the request will come only once + * the lock has been successfully acquired (which + * may take an arbitrarily large amount of time) + */ + final case class AcquireLockRequest( + resourceName: String, + exclusive: Boolean, + returnImmediately: Boolean + ) extends ApiRequest + + /** A response indicating that the lock has been successfully acquired. + * + * @param lockId a unique identifier of the lock that can be used to + * release it + */ + final case class LockAcquired(lockId: UUID) extends ApiResponse + + /** A response indicating that the lock could not be acquired immediately. + * + * It is only sent if the request had `returnImmediately` set to true. + */ + final case class CannotAcquireImmediately() extends ApiResponse + + /** A response indicating a general failure to acquire the lock. + * + * @param errorMessage message associated with the exception that caused + * this failure + */ + final case class LockAcquireFailed(errorMessage: String) extends ApiResponse + + /** A request sent from the runtime to release a lock. + * + * @param lockId the identifier of the lock to release, as specified in the + * [[LockAcquired]] response + */ + final case class ReleaseLockRequest(lockId: UUID) extends ApiRequest + + /** A response indicating that the lock has been successfully released. */ + final case class LockReleased() extends ApiResponse + + /** A response indicating a general failure to release the lock. + * + * @param errorMessage message associated with the exception that caused + * this failure + */ + final case class LockReleaseFailed(errorMessage: String) extends ApiResponse + private lazy val mapper = { val factory = new CBORFactory() val mapper = new ObjectMapper(factory) with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } - /** Serializes a Request into a byte buffer. - * - * @param message the message to serialize. - * @return the serialized version of the message. - */ - def serialize(message: Request): ByteBuffer = - ByteBuffer.wrap(mapper.writeValueAsBytes(message)) - - /** Serializes a Response into a byte buffer. + /** Serializes an ApiEnvelope into a byte buffer. * * @param message the message to serialize. * @return the serialized version of the message. */ - def serialize(message: Response): ByteBuffer = + def serialize(message: ApiEnvelope): ByteBuffer = ByteBuffer.wrap(mapper.writeValueAsBytes(message)) - /** Deserializes a byte buffer into a Request message. - * - * @param bytes the buffer to deserialize - * @return the deserialized message, if the byte buffer can be deserialized. - */ - def deserializeRequest(bytes: ByteBuffer): Option[Request] = - Try(mapper.readValue(bytes.array(), classOf[Request])).toOption - - /** Deserializes a byte buffer into a Response message. + /** Deserializes a byte buffer into an ApiEnvelope, which can be a Request + * or a Response. * * @param bytes the buffer to deserialize * @return the deserialized message, if the byte buffer can be deserialized. */ - def deserializeResponse(bytes: ByteBuffer): Option[Response] = - Try(mapper.readValue(bytes.array(), classOf[Response])).toOption + def deserializeApiEnvelope(bytes: ByteBuffer): Option[ApiEnvelope] = + Try(mapper.readValue(bytes.array(), classOf[ApiEnvelope])).toOption } } diff --git a/engine/runtime/src/main/java/org/enso/interpreter/Language.java b/engine/runtime/src/main/java/org/enso/interpreter/Language.java index f4801d235289..057dd3e24a29 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/Language.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/Language.java @@ -4,13 +4,17 @@ import com.oracle.truffle.api.InstrumentInfo; import com.oracle.truffle.api.Truffle; import com.oracle.truffle.api.TruffleLanguage; +import com.oracle.truffle.api.TruffleLogger; import com.oracle.truffle.api.debug.DebuggerTags; import com.oracle.truffle.api.instrumentation.ProvidedTags; import com.oracle.truffle.api.instrumentation.StandardTags; import com.oracle.truffle.api.nodes.RootNode; +import org.enso.distribution.DistributionManager; +import org.enso.distribution.Environment; +import org.enso.distribution.locking.LockManager; +import org.enso.distribution.locking.ThreadSafeFileLockManager; import org.enso.interpreter.epb.EpbLanguage; import org.enso.interpreter.instrument.IdExecutionInstrument; -import org.enso.interpreter.instrument.NotificationHandler; import org.enso.interpreter.instrument.NotificationHandler.Forwarder; import org.enso.interpreter.instrument.NotificationHandler.TextMode$; import org.enso.interpreter.node.ProgramRootNode; @@ -18,6 +22,7 @@ import org.enso.interpreter.runtime.tag.IdentifiedTag; import org.enso.interpreter.service.ExecutionService; import org.enso.interpreter.util.FileDetector; +import org.enso.lockmanager.client.ConnectedLockManager; import org.enso.polyglot.LanguageInfo; import org.enso.polyglot.RuntimeOptions; import org.graalvm.options.OptionDescriptors; @@ -70,11 +75,33 @@ protected Context createContext(Env env) { notificationHandler.addListener(TextMode$.MODULE$); } - Context context = new Context(this, getLanguageHome(), env, notificationHandler); + TruffleLogger logger = env.getLogger(Language.class); + + var environment = new Environment() {}; + var distributionManager = new DistributionManager(environment); + + LockManager lockManager; + ConnectedLockManager connectedLockManager = null; + + if (isInteractiveMode) { + logger.finest( + "Detected interactive mode, will try to connect to a lock manager managed by it."); + connectedLockManager = new ConnectedLockManager(); + lockManager = connectedLockManager; + } else { + logger.finest("Detected text mode, using a standalone lock manager."); + lockManager = new ThreadSafeFileLockManager(distributionManager.paths().locks()); + } + + Context context = + new Context( + this, getLanguageHome(), env, notificationHandler, lockManager, distributionManager); InstrumentInfo idValueListenerInstrument = env.getInstruments().get(IdExecutionInstrument.INSTRUMENT_ID); idExecutionInstrument = env.lookup(idValueListenerInstrument, IdExecutionInstrument.class); - env.registerService(new ExecutionService(context, idExecutionInstrument, notificationHandler)); + env.registerService( + new ExecutionService( + context, idExecutionInstrument, notificationHandler, connectedLockManager)); return context; } diff --git a/engine/runtime/src/main/java/org/enso/interpreter/instrument/RuntimeServerInstrument.java b/engine/runtime/src/main/java/org/enso/interpreter/instrument/RuntimeServerInstrument.java index ad0b5dfcce7f..68a94f2c855c 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/instrument/RuntimeServerInstrument.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/instrument/RuntimeServerInstrument.java @@ -5,19 +5,17 @@ import com.oracle.truffle.api.instrumentation.EventBinding; import com.oracle.truffle.api.instrumentation.TruffleInstrument; import com.oracle.truffle.api.nodes.LanguageInfo; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; import org.enso.interpreter.service.ExecutionService; -import org.enso.polyglot.*; +import org.enso.polyglot.RuntimeServerInfo; import org.graalvm.options.OptionDescriptor; import org.graalvm.options.OptionDescriptors; import org.graalvm.options.OptionKey; import org.graalvm.polyglot.io.MessageEndpoint; import org.graalvm.polyglot.io.MessageTransport; -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Collections; - /** * An instrument exposing a server for other services to connect to, in order to control the current * language context and request executions. @@ -93,6 +91,11 @@ protected void onCreate(Env env) { env.startServer(URI.create(RuntimeServerInfo.URI), handler.endpoint()); if (client != null) { handler.endpoint().setClient(client); + } else { + env.getLogger(RuntimeServerInstrument.class) + .warning( + "The client endpoint has not been initialized. The Runtime " + + "Server Instrument may very likely not function properly."); } } catch (MessageTransport.VetoException | IOException e) { throw new RuntimeException(e); diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/Context.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/Context.java index 77fc5bd69424..fcd07e79eb3d 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/Context.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/Context.java @@ -16,8 +16,7 @@ import org.enso.compiler.PackageRepository; import org.enso.compiler.data.CompilerConfig; import org.enso.distribution.DistributionManager; -import org.enso.distribution.Environment; -import org.enso.distribution.locking.ThreadSafeFileLockManager; +import org.enso.distribution.locking.LockManager; import org.enso.editions.LibraryName; import org.enso.interpreter.Language; import org.enso.interpreter.OptionsHelper; @@ -59,7 +58,8 @@ public class Context { private final CompilerConfig compilerConfig; private final NotificationHandler notificationHandler; private final TruffleLogger logger = TruffleLogger.getLogger(LanguageInfo.ID, Context.class); - private @CompilationFinal DistributionManager distributionManager; + private final DistributionManager distributionManager; + private final LockManager lockManager; /** * Creates a new Enso context. @@ -68,9 +68,16 @@ public class Context { * @param home language home * @param environment the execution environment of the {@link TruffleLanguage} * @param notificationHandler a handler for notifications + * @param lockManager the lock manager instance + * @param distributionManager a distribution manager */ public Context( - Language language, String home, Env environment, NotificationHandler notificationHandler) { + Language language, + String home, + Env environment, + NotificationHandler notificationHandler, + LockManager lockManager, + DistributionManager distributionManager) { this.language = language; this.environment = environment; this.out = new PrintStream(environment.out()); @@ -86,6 +93,8 @@ public Context( this.home = home; this.builtins = new Builtins(this); this.notificationHandler = notificationHandler; + this.lockManager = lockManager; + this.distributionManager = distributionManager; } /** Perform expensive initialization logic for the context. */ @@ -107,13 +116,6 @@ public void initialize() { var languageHome = OptionsHelper.getLanguageHomeOverride(environment).or(() -> Optional.ofNullable(home)); - - var environment = new Environment() {}; - this.distributionManager = new DistributionManager(environment); - - // TODO [RW] Once #1890 is implemented, this will need to connect to the Language Server's - // LockManager. - var lockManager = new ThreadSafeFileLockManager(distributionManager.paths().locks()); var resourceManager = new org.enso.distribution.locking.ResourceManager(lockManager); packageRepository = diff --git a/engine/runtime/src/main/java/org/enso/interpreter/service/ExecutionService.java b/engine/runtime/src/main/java/org/enso/interpreter/service/ExecutionService.java index ab195504648b..decf3a56d6a5 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/service/ExecutionService.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/service/ExecutionService.java @@ -39,6 +39,7 @@ import org.enso.interpreter.service.error.ModuleNotFoundException; import org.enso.interpreter.service.error.ModuleNotFoundForFileException; import org.enso.interpreter.service.error.SourceNotFoundException; +import org.enso.lockmanager.client.ConnectedLockManager; import org.enso.polyglot.LanguageInfo; import org.enso.polyglot.MethodNames; import org.enso.text.buffer.Rope; @@ -57,6 +58,7 @@ public class ExecutionService { private final NotificationHandler.Forwarder notificationForwarder; private final InteropLibrary interopLibrary = InteropLibrary.getFactory().getUncached(); private final TruffleLogger logger = TruffleLogger.getLogger(LanguageInfo.ID); + private final ConnectedLockManager connectedLockManager; /** * Creates a new instance of this service. @@ -64,14 +66,19 @@ public class ExecutionService { * @param context the language context to use. * @param idExecutionInstrument an instance of the {@link IdExecutionInstrument} to use in the * course of executions. + * @param notificationForwarder a forwarder of notifications, used to communicate with the user + * @param connectedLockManager a connected lock manager (if it is in use) that should be connected + * to the language server, or null */ public ExecutionService( Context context, IdExecutionInstrument idExecutionInstrument, - NotificationHandler.Forwarder notificationForwarder) { + NotificationHandler.Forwarder notificationForwarder, + ConnectedLockManager connectedLockManager) { this.idExecutionInstrument = idExecutionInstrument; this.context = context; this.notificationForwarder = notificationForwarder; + this.connectedLockManager = connectedLockManager; } /** @return the language context. */ @@ -104,6 +111,14 @@ private FunctionCallInstrumentationNode.FunctionCall prepareFunctionCall( public void initializeLanguageServerConnection(Endpoint endpoint) { var notificationHandler = new NotificationHandler.InteractiveMode(endpoint); notificationForwarder.addListener(notificationHandler); + + if (connectedLockManager != null) { + connectedLockManager.connect(endpoint); + } else { + logger.warning( + "ConnectedLockManager was not initialized, even though a Language Server connection has been established. " + + "This may result in synchronization errors."); + } } /** diff --git a/engine/runtime/src/main/scala/org/enso/compiler/PackageRepository.scala b/engine/runtime/src/main/scala/org/enso/compiler/PackageRepository.scala index d6e8cca0bdc5..480d942cc440 100644 --- a/engine/runtime/src/main/scala/org/enso/compiler/PackageRepository.scala +++ b/engine/runtime/src/main/scala/org/enso/compiler/PackageRepository.scala @@ -223,7 +223,7 @@ object PackageRepository { .map { case ResolvingLibraryProvider.Error.NotResolved(details) => Error.PackageCouldNotBeResolved(details) - case ResolvingLibraryProvider.Error.DownloadFailed(reason) => + case ResolvingLibraryProvider.Error.DownloadFailed(_, reason) => Error.PackageDownloadFailed(reason) case ResolvingLibraryProvider.Error.RequestedLocalLibraryDoesNotExist => Error.PackageLoadingError( diff --git a/engine/runtime/src/main/scala/org/enso/interpreter/instrument/Handler.scala b/engine/runtime/src/main/scala/org/enso/interpreter/instrument/Handler.scala index 57d5337bd7e8..2fe3d22d3af3 100644 --- a/engine/runtime/src/main/scala/org/enso/interpreter/instrument/Handler.scala +++ b/engine/runtime/src/main/scala/org/enso/interpreter/instrument/Handler.scala @@ -7,15 +7,30 @@ import org.enso.interpreter.instrument.execution.{ CommandProcessor } import org.enso.interpreter.service.ExecutionService -import org.enso.polyglot.runtime.Runtime.Api +import org.enso.lockmanager.client.{ + RuntimeServerConnectionEndpoint, + RuntimeServerRequestHandler +} +import org.enso.polyglot.runtime.Runtime.{Api, ApiRequest, ApiResponse} import org.graalvm.polyglot.io.MessageEndpoint import java.nio.ByteBuffer +import scala.concurrent.Future /** A message endpoint implementation used by the * [[org.enso.interpreter.instrument.RuntimeServerInstrument]]. */ -class Endpoint(handler: Handler) extends MessageEndpoint { +class Endpoint(handler: Handler) + extends MessageEndpoint + with RuntimeServerConnectionEndpoint { + + /** A helper endpoint that is used for handling requests sent to the Language + * Server. + */ + private val reverseRequestEndpoint = new RuntimeServerRequestHandler { + override def sendToClient(request: Api.Request): Unit = + client.sendBinary(Api.serialize(request)) + } var client: MessageEndpoint = _ @@ -32,10 +47,19 @@ class Endpoint(handler: Handler) extends MessageEndpoint { def sendToClient(msg: Api.Response): Unit = client.sendBinary(Api.serialize(msg)) + /** Sends a request to the connected client and expects a reply. */ + override def sendRequest(msg: ApiRequest): Future[ApiResponse] = + reverseRequestEndpoint.sendRequest(msg) + override def sendText(text: String): Unit = {} override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeRequest(data).foreach(handler.onMessage) + Api.deserializeApiEnvelope(data).foreach { + case request: Api.Request => + handler.onMessage(request) + case response: Api.Response => + reverseRequestEndpoint.onResponseReceived(response) + } override def sendPing(data: ByteBuffer): Unit = client.sendPong(data) @@ -83,19 +107,15 @@ final class Handler { * * @param request the message to handle. */ - def onMessage(request: Api.Request): Unit = { - request.payload match { - case Api.ShutDownRuntimeServer() => - commandProcessor.stop() - endpoint.sendToClient( - Api.Response(request.requestId, Api.RuntimeServerShutDown()) - ) - - case _ => - val cmd = CommandFactory.createCommand(request) - commandProcessor.invoke(cmd) - } + def onMessage(request: Api.Request): Unit = request match { + case Api.Request(requestId, Api.ShutDownRuntimeServer()) => + commandProcessor.stop() + endpoint.sendToClient( + Api.Response(requestId, Api.RuntimeServerShutDown()) + ) + case request: Api.Request => + val cmd = CommandFactory.createCommand(request) + commandProcessor.invoke(cmd) } - } diff --git a/engine/runtime/src/main/scala/org/enso/interpreter/instrument/command/CommandFactory.scala b/engine/runtime/src/main/scala/org/enso/interpreter/instrument/command/CommandFactory.scala index 40c4514ba6a6..e03f720937cc 100644 --- a/engine/runtime/src/main/scala/org/enso/interpreter/instrument/command/CommandFactory.scala +++ b/engine/runtime/src/main/scala/org/enso/interpreter/instrument/command/CommandFactory.scala @@ -60,6 +60,11 @@ object CommandFactory { throw new IllegalArgumentException( "ShutDownRuntimeServer request is not convertible to command object" ) + + case _: Api.AcquireLockRequest | _: Api.ReleaseLockRequest => + throw new IllegalArgumentException( + "Lock-related requests are not meant to be handled by the runtime." + ) } } diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeErrorsTest.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeErrorsTest.scala index bd19f864418e..b115194f1ffb 100644 --- a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeErrorsTest.scala +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeErrorsTest.scala @@ -1,10 +1,7 @@ package org.enso.interpreter.test.instrument -import java.io.{ByteArrayOutputStream, File} -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} -import java.util.UUID -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import org.enso.distribution.FileSystem +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.interpreter.instrument.execution.Timer import org.enso.interpreter.runtime.`type`.Constants import org.enso.interpreter.test.Metadata @@ -15,11 +12,15 @@ import org.enso.text.editing.model import org.enso.text.editing.model.TextEdit import org.enso.text.{ContentVersion, Sha3_224VersionCalculator} import org.graalvm.polyglot.Context -import org.graalvm.polyglot.io.MessageEndpoint import org.scalatest.BeforeAndAfterEach import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import java.io.{ByteArrayOutputStream, File} +import java.nio.file.{Files, Path, Paths} +import java.util.UUID +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + @scala.annotation.nowarn("msg=multiarg infix syntax") class RuntimeErrorsTest extends AnyFlatSpec @@ -37,15 +38,18 @@ class RuntimeErrorsTest var context: TestContext = _ class TestContext(packageName: String) { - var endPoint: MessageEndpoint = _ val messageQueue: LinkedBlockingQueue[Api.Response] = new LinkedBlockingQueue() - val tmpDir: File = Files.createTempDirectory("enso-test-packages").toFile + val tmpDir: Path = Files.createTempDirectory("enso-test-packages") + sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir)) + val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks")) + val runtimeServerEmulator = + new RuntimeServerEmulator(messageQueue, lockManager) val pkg: Package[File] = PackageManager.Default.create( - tmpDir, + tmpDir.toFile, packageName, namespace = "Enso_Test" ) @@ -67,23 +71,7 @@ class RuntimeErrorsTest Paths.get("../../distribution/component").toFile.getAbsolutePath ) .out(out) - .serverTransport { (uri, peer) => - if (uri.toString == RuntimeServerInfo.URI) { - endPoint = peer - new MessageEndpoint { - override def sendText(text: String): Unit = {} - - override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeResponse(data).foreach(messageQueue.add) - - override def sendPing(data: ByteBuffer): Unit = {} - - override def sendPong(data: ByteBuffer): Unit = {} - - override def sendClose(): Unit = {} - } - } else null - } + .serverTransport(runtimeServerEmulator.makeServerTransport) .build() ) executionContext.context.initialize(LanguageInfo.ID) @@ -106,7 +94,7 @@ class RuntimeErrorsTest Files.write(file.toPath, contents.getBytes).toFile } - def send(msg: Api.Request): Unit = endPoint.sendBinary(Api.serialize(msg)) + def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg) def receiveNone: Option[Api.Response] = { Option(messageQueue.poll()) diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeInstrumentTest.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeInstrumentTest.scala index 376b661a9234..278694d0975f 100644 --- a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeInstrumentTest.scala +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeInstrumentTest.scala @@ -1,10 +1,7 @@ package org.enso.interpreter.test.instrument -import java.io.{ByteArrayOutputStream, File} -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} -import java.util.UUID -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import org.enso.distribution.FileSystem +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.interpreter.instrument.execution.Timer import org.enso.interpreter.runtime.`type`.Constants import org.enso.interpreter.test.Metadata @@ -13,11 +10,15 @@ import org.enso.polyglot._ import org.enso.polyglot.runtime.Runtime.Api import org.enso.text.{ContentVersion, Sha3_224VersionCalculator} import org.graalvm.polyglot.Context -import org.graalvm.polyglot.io.MessageEndpoint import org.scalatest.BeforeAndAfterEach import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import java.io.{ByteArrayOutputStream, File} +import java.nio.file.{Files, Path, Paths} +import java.util.UUID +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + @scala.annotation.nowarn("msg=multiarg infix syntax") class RuntimeInstrumentTest extends AnyFlatSpec @@ -35,14 +36,17 @@ class RuntimeInstrumentTest var context: TestContext = _ class TestContext(packageName: String) { - var endPoint: MessageEndpoint = _ val messageQueue: LinkedBlockingQueue[Api.Response] = new LinkedBlockingQueue() - val tmpDir: File = Files.createTempDirectory("enso-test-packages").toFile + val tmpDir: Path = Files.createTempDirectory("enso-test-packages") + sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir)) + val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks")) + val runtimeServerEmulator = + new RuntimeServerEmulator(messageQueue, lockManager) val pkg: Package[File] = - PackageManager.Default.create(tmpDir, packageName, "Enso_Test") + PackageManager.Default.create(tmpDir.toFile, packageName, "Enso_Test") val out: ByteArrayOutputStream = new ByteArrayOutputStream() val executionContext = new PolyglotContext( Context @@ -61,23 +65,7 @@ class RuntimeInstrumentTest Paths.get("../../distribution/component").toFile.getAbsolutePath ) .out(out) - .serverTransport { (uri, peer) => - if (uri.toString == RuntimeServerInfo.URI) { - endPoint = peer - new MessageEndpoint { - override def sendText(text: String): Unit = {} - - override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeResponse(data).foreach(messageQueue.add) - - override def sendPing(data: ByteBuffer): Unit = {} - - override def sendPong(data: ByteBuffer): Unit = {} - - override def sendClose(): Unit = {} - } - } else null - } + .serverTransport(runtimeServerEmulator.makeServerTransport) .build() ) executionContext.context.initialize(LanguageInfo.ID) @@ -100,7 +88,7 @@ class RuntimeInstrumentTest Files.write(file.toPath, contents.getBytes).toFile } - def send(msg: Api.Request): Unit = endPoint.sendBinary(Api.serialize(msg)) + def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg) def receiveNone: Option[Api.Response] = { Option(messageQueue.poll()) diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerEmulator.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerEmulator.scala new file mode 100644 index 000000000000..cea4fece68c4 --- /dev/null +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerEmulator.scala @@ -0,0 +1,83 @@ +package org.enso.interpreter.test.instrument + +import akka.actor.ActorSystem +import org.enso.distribution.locking.ThreadSafeLockManager +import org.enso.lockmanager.server.LockManagerService +import org.enso.polyglot.RuntimeServerInfo +import org.enso.polyglot.runtime.Runtime.Api +import org.graalvm.polyglot.io.{MessageEndpoint, MessageTransport} + +import java.nio.ByteBuffer +import java.util.concurrent.LinkedBlockingQueue + +/** Emulates the language server for the purposes of testing. + * + * Runtime tests are run in the absence of a real language server, which is + * only simulated by manually sending requests to the runtime and checking the + * answers. + * + * However the runtime can also send requests (currently related to the locking + * mechanism) to the language server and if no language server is present, + * these requests would go unhandled and stall the tests (because they would + * wait forever on the locks). To fix this issue, this class emulates the + * request-handling part of the language server by forwarding the lock-related + * requests to a provided lock manager. Any other communication (like runtime's + * responses to fake language server requests) are directed to the provided + * message queue, so that they can be inspected by the tests. The lock-related + * communication is not forwarded to the message queue, as it is fully handled + * by this class. + * + * @param messageQueue the queue on which runtime's responses are pushed + * @param lockManager the lock manager to use for handling the lock-related + * requests + */ +class RuntimeServerEmulator( + messageQueue: LinkedBlockingQueue[Api.Response], + lockManager: ThreadSafeLockManager +) { + private val system: ActorSystem = ActorSystem("TestSystem") + private var endpoint: MessageEndpoint = _ + + private val lockManagerService = + system.actorOf(LockManagerService.props(lockManager)) + + private val connector = system.actorOf( + TestRuntimeServerConnector.props( + lockManagerService, + { response => endpoint.sendBinary(Api.serialize(response)) } + ) + ) + + /** Sends a message to the runtime. */ + def sendToRuntime(msg: Api.Request): Unit = + endpoint.sendBinary(Api.serialize(msg)) + + /** Creates a [[MessageTransport]] that should be provided when building the + * context. + */ + def makeServerTransport: MessageTransport = { (uri, peer) => + if (uri.toString == RuntimeServerInfo.URI) { + endpoint = peer + new MessageEndpoint { + override def sendText(text: String): Unit = {} + + override def sendBinary(data: ByteBuffer): Unit = { + Api.deserializeApiEnvelope(data) match { + case Some(request: Api.Request) => + connector ! request + case Some(response: Api.Response) => + messageQueue.add(response) + case None => + println("Failed to deserialize a message.") + } + } + + override def sendPing(data: ByteBuffer): Unit = {} + + override def sendPong(data: ByteBuffer): Unit = {} + + override def sendClose(): Unit = {} + } + } else null + } +} diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerTest.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerTest.scala index 8f6975d0ad8e..bc3827f8ea9b 100644 --- a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerTest.scala +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeServerTest.scala @@ -1,5 +1,7 @@ package org.enso.interpreter.test.instrument +import org.enso.distribution.FileSystem +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.interpreter.instrument.execution.Timer import org.enso.interpreter.runtime.`type`.{Constants, Types} import org.enso.interpreter.runtime.{Context => EnsoContext} @@ -11,14 +13,12 @@ import org.enso.polyglot.runtime.Runtime.Api import org.enso.text.editing.model import org.enso.text.editing.model.TextEdit import org.graalvm.polyglot.Context -import org.graalvm.polyglot.io.MessageEndpoint import org.scalatest.BeforeAndAfterEach import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import java.io.{ByteArrayOutputStream, File} -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} +import java.nio.file.{Files, Path, Paths} import java.util.UUID import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} @@ -39,14 +39,17 @@ class RuntimeServerTest var context: TestContext = _ class TestContext(packageName: String) { - var endPoint: MessageEndpoint = _ val messageQueue: LinkedBlockingQueue[Api.Response] = new LinkedBlockingQueue() - val tmpDir: File = Files.createTempDirectory("enso-test-packages").toFile + val tmpDir: Path = Files.createTempDirectory("enso-test-packages") + sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir)) + val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks")) + val runtimeServerEmulator = + new RuntimeServerEmulator(messageQueue, lockManager) val pkg: Package[File] = - PackageManager.Default.create(tmpDir, packageName, "Enso_Test") + PackageManager.Default.create(tmpDir.toFile, packageName, "Enso_Test") val out: ByteArrayOutputStream = new ByteArrayOutputStream() val logOut: ByteArrayOutputStream = new ByteArrayOutputStream() val executionContext = new PolyglotContext( @@ -67,23 +70,7 @@ class RuntimeServerTest ) .logHandler(logOut) .out(out) - .serverTransport { (uri, peer) => - if (uri.toString == RuntimeServerInfo.URI) { - endPoint = peer - new MessageEndpoint { - override def sendText(text: String): Unit = {} - - override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeResponse(data).foreach(messageQueue.add) - - override def sendPing(data: ByteBuffer): Unit = {} - - override def sendPong(data: ByteBuffer): Unit = {} - - override def sendClose(): Unit = {} - } - } else null - } + .serverTransport(runtimeServerEmulator.makeServerTransport) .build() ) executionContext.context.initialize(LanguageInfo.ID) @@ -108,7 +95,7 @@ class RuntimeServerTest Files.write(file.toPath, contents.getBytes).toFile } - def send(msg: Api.Request): Unit = endPoint.sendBinary(Api.serialize(msg)) + def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg) def receiveNone: Option[Api.Response] = { Option(messageQueue.poll()) diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeStdlibTest.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeStdlibTest.scala index e497db2c5851..6fb3cc98e64d 100644 --- a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeStdlibTest.scala +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeStdlibTest.scala @@ -1,19 +1,19 @@ package org.enso.interpreter.test.instrument +import org.enso.distribution.FileSystem +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.interpreter.test.Metadata import org.enso.pkg.{Package, PackageManager} import org.enso.polyglot._ import org.enso.polyglot.runtime.Runtime.Api import org.enso.testkit.OsSpec import org.graalvm.polyglot.Context -import org.graalvm.polyglot.io.MessageEndpoint import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import java.io.{ByteArrayOutputStream, File} -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} +import java.nio.file.{Files, Path, Paths} import java.util.UUID import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} @@ -31,16 +31,19 @@ class RuntimeStdlibTest class TestContext(packageName: String) { - var endPoint: MessageEndpoint = _ val messageQueue: LinkedBlockingQueue[Api.Response] = new LinkedBlockingQueue() - val tmpDir: File = Files.createTempDirectory("enso-test-packages").toFile + val tmpDir: Path = Files.createTempDirectory("enso-test-packages") + sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir)) val distributionHome: File = Paths.get("../../distribution/component").toFile.getAbsoluteFile + val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks")) + val runtimeServerEmulator = + new RuntimeServerEmulator(messageQueue, lockManager) val pkg: Package[File] = - PackageManager.Default.create(tmpDir, packageName, "Enso_Test") + PackageManager.Default.create(tmpDir.toFile, packageName, "Enso_Test") val out: ByteArrayOutputStream = new ByteArrayOutputStream() val executionContext = new PolyglotContext( Context @@ -57,23 +60,7 @@ class RuntimeStdlibTest .option(RuntimeServerInfo.ENABLE_OPTION, "true") .option(RuntimeOptions.INTERACTIVE_MODE, "true") .out(out) - .serverTransport { (uri, peer) => - if (uri.toString == RuntimeServerInfo.URI) { - endPoint = peer - new MessageEndpoint { - override def sendText(text: String): Unit = {} - - override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeResponse(data).foreach(messageQueue.add) - - override def sendPing(data: ByteBuffer): Unit = {} - - override def sendPong(data: ByteBuffer): Unit = {} - - override def sendClose(): Unit = {} - } - } else null - } + .serverTransport(runtimeServerEmulator.makeServerTransport) .build() ) executionContext.context.initialize(LanguageInfo.ID) @@ -92,7 +79,7 @@ class RuntimeStdlibTest Files.write(file.toPath, contents.getBytes).toFile } - def send(msg: Api.Request): Unit = endPoint.sendBinary(Api.serialize(msg)) + def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg) def receiveOne: Option[Api.Response] = { Option(messageQueue.poll()) diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeSuggestionUpdatesTest.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeSuggestionUpdatesTest.scala index 930add90da15..158adabd100d 100644 --- a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeSuggestionUpdatesTest.scala +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeSuggestionUpdatesTest.scala @@ -1,25 +1,25 @@ package org.enso.interpreter.test.instrument -import java.io.{ByteArrayOutputStream, File} -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} -import java.util.UUID -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} - +import org.enso.distribution.FileSystem +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.interpreter.runtime.`type`.Constants import org.enso.pkg.{Package, PackageManager} import org.enso.polyglot._ import org.enso.polyglot.data.Tree import org.enso.polyglot.runtime.Runtime.Api -import org.enso.text.{ContentVersion, Sha3_224VersionCalculator} import org.enso.text.editing.model import org.enso.text.editing.model.TextEdit +import org.enso.text.{ContentVersion, Sha3_224VersionCalculator} import org.graalvm.polyglot.Context -import org.graalvm.polyglot.io.MessageEndpoint import org.scalatest.BeforeAndAfterEach import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import java.io.{ByteArrayOutputStream, File} +import java.nio.file.{Files, Path, Paths} +import java.util.UUID +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + @scala.annotation.nowarn("msg=multiarg infix syntax") class RuntimeSuggestionUpdatesTest extends AnyFlatSpec @@ -29,14 +29,17 @@ class RuntimeSuggestionUpdatesTest var context: TestContext = _ class TestContext(packageName: String) { - var endPoint: MessageEndpoint = _ val messageQueue: LinkedBlockingQueue[Api.Response] = new LinkedBlockingQueue() - val tmpDir: File = Files.createTempDirectory("enso-test-packages").toFile + val tmpDir: Path = Files.createTempDirectory("enso-test-packages") + sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir)) + val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks")) + val runtimeServerEmulator = + new RuntimeServerEmulator(messageQueue, lockManager) val pkg: Package[File] = - PackageManager.Default.create(tmpDir, packageName, "Enso_Test") + PackageManager.Default.create(tmpDir.toFile, packageName, "Enso_Test") val out: ByteArrayOutputStream = new ByteArrayOutputStream() val executionContext = new PolyglotContext( Context @@ -54,23 +57,7 @@ class RuntimeSuggestionUpdatesTest Paths.get("../../distribution/component").toFile.getAbsolutePath ) .out(out) - .serverTransport { (uri, peer) => - if (uri.toString == RuntimeServerInfo.URI) { - endPoint = peer - new MessageEndpoint { - override def sendText(text: String): Unit = {} - - override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeResponse(data).foreach(messageQueue.add) - - override def sendPing(data: ByteBuffer): Unit = {} - - override def sendPong(data: ByteBuffer): Unit = {} - - override def sendClose(): Unit = {} - } - } else null - } + .serverTransport(runtimeServerEmulator.makeServerTransport) .build() ) executionContext.context.initialize(LanguageInfo.ID) @@ -86,7 +73,7 @@ class RuntimeSuggestionUpdatesTest Files.write(file.toPath, contents.getBytes).toFile } - def send(msg: Api.Request): Unit = endPoint.sendBinary(Api.serialize(msg)) + def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg) def receiveNone: Option[Api.Response] = { Option(messageQueue.poll()) diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeVisualisationsTest.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeVisualisationsTest.scala index 9c3676a08ad3..ea88e0ca73f7 100644 --- a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeVisualisationsTest.scala +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/RuntimeVisualisationsTest.scala @@ -1,10 +1,7 @@ package org.enso.interpreter.test.instrument -import java.io.{ByteArrayOutputStream, File} -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} -import java.util.UUID -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import org.enso.distribution.FileSystem +import org.enso.distribution.locking.ThreadSafeFileLockManager import org.enso.interpreter.instrument.execution.Timer import org.enso.interpreter.runtime.`type`.Constants import org.enso.interpreter.runtime.{Context => EnsoContext} @@ -15,11 +12,15 @@ import org.enso.polyglot.runtime.Runtime.Api import org.enso.text.editing.model import org.enso.text.editing.model.TextEdit import org.graalvm.polyglot.Context -import org.graalvm.polyglot.io.MessageEndpoint import org.scalatest.BeforeAndAfterEach import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import java.io.{ByteArrayOutputStream, File} +import java.nio.file.{Files, Path, Paths} +import java.util.UUID +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + @scala.annotation.nowarn("msg=multiarg infix syntax") class RuntimeVisualisationsTest extends AnyFlatSpec @@ -37,14 +38,17 @@ class RuntimeVisualisationsTest var context: TestContext = _ class TestContext(packageName: String) { - var endPoint: MessageEndpoint = _ val messageQueue: LinkedBlockingQueue[Api.Response] = new LinkedBlockingQueue() - val tmpDir: File = Files.createTempDirectory("enso-test-packages").toFile + val tmpDir: Path = Files.createTempDirectory("enso-test-packages") + sys.addShutdownHook(FileSystem.removeDirectoryIfExists(tmpDir)) + val lockManager = new ThreadSafeFileLockManager(tmpDir.resolve("locks")) + val runtimeServerEmulator = + new RuntimeServerEmulator(messageQueue, lockManager) val pkg: Package[File] = - PackageManager.Default.create(tmpDir, packageName, "Enso_Test") + PackageManager.Default.create(tmpDir.toFile, packageName, "Enso_Test") val out: ByteArrayOutputStream = new ByteArrayOutputStream() val logOut: ByteArrayOutputStream = new ByteArrayOutputStream() val executionContext = new PolyglotContext( @@ -65,23 +69,7 @@ class RuntimeVisualisationsTest ) .logHandler(logOut) .out(out) - .serverTransport { (uri, peer) => - if (uri.toString == RuntimeServerInfo.URI) { - endPoint = peer - new MessageEndpoint { - override def sendText(text: String): Unit = {} - - override def sendBinary(data: ByteBuffer): Unit = - Api.deserializeResponse(data).foreach(messageQueue.add) - - override def sendPing(data: ByteBuffer): Unit = {} - - override def sendPong(data: ByteBuffer): Unit = {} - - override def sendClose(): Unit = {} - } - } else null - } + .serverTransport(runtimeServerEmulator.makeServerTransport) .build() ) executionContext.context.initialize(LanguageInfo.ID) @@ -104,7 +92,7 @@ class RuntimeVisualisationsTest Files.write(file.toPath, contents.getBytes).toFile } - def send(msg: Api.Request): Unit = endPoint.sendBinary(Api.serialize(msg)) + def send(msg: Api.Request): Unit = runtimeServerEmulator.sendToRuntime(msg) def receiveNone: Option[Api.Response] = { Option(messageQueue.poll()) diff --git a/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/TestRuntimeServerConnector.scala b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/TestRuntimeServerConnector.scala new file mode 100644 index 000000000000..cbb3d1205e6f --- /dev/null +++ b/engine/runtime/src/test/scala/org/enso/interpreter/test/instrument/TestRuntimeServerConnector.scala @@ -0,0 +1,40 @@ +package org.enso.interpreter.test.instrument + +import akka.actor.{Actor, ActorRef, Props} +import com.typesafe.scalalogging.LazyLogging +import org.enso.lockmanager.server.LockManagerService +import org.enso.polyglot.runtime.Runtime.Api + +/** A helper Actor that is used to pass messages from the runtime to the lock + * manager service and vice-versa. + * + * @param lockManagerService reference to the lock manager service actor + * @param sendResponse a callback used to send the responses from the lock + * manager service back to the runtime + */ +class TestRuntimeServerConnector( + lockManagerService: ActorRef, + sendResponse: Api.Response => Unit +) extends Actor + with LazyLogging { + override def receive: Receive = { + case request @ Api.Request(_, payload) => + if (LockManagerService.handledRequestTypes.contains(payload.getClass)) { + lockManagerService ! request + } else { + logger.warn(s"Unknown request: $request") + } + + case response: Api.Response => + sendResponse(response) + } +} + +object TestRuntimeServerConnector { + def props( + lockManagerService: ActorRef, + sendResponse: Api.Response => Unit + ): Props = Props( + new TestRuntimeServerConnector(lockManagerService, sendResponse) + ) +} diff --git a/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/ConnectedLockManager.scala b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/ConnectedLockManager.scala new file mode 100644 index 000000000000..228810bb9fb2 --- /dev/null +++ b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/ConnectedLockManager.scala @@ -0,0 +1,104 @@ +package org.enso.lockmanager.client + +import org.enso.distribution.locking.{Lock, LockManager, LockType} +import org.enso.polyglot.runtime.Runtime +import org.enso.polyglot.runtime.Runtime.Api + +import java.util.UUID +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +/** Implements the [[LockManager]] interface by using a + * [[RuntimeServerConnectionEndpoint]] and delegating the locking requests to a + * lock manager service. + */ +class ConnectedLockManager extends LockManager { + private var endpoint: Option[RuntimeServerConnectionEndpoint] = None + + /** Establishes the connection with the endpoint. + * + * The lock manager is not usable before this function is called. + */ + def connect(endpoint: RuntimeServerConnectionEndpoint): Unit = { + this.endpoint = Some(endpoint) + } + + private def getEndpoint: RuntimeServerConnectionEndpoint = + endpoint.getOrElse { + throw new IllegalStateException( + "LockManager is used before the Language Server connection has " + + "been established." + ) + } + + private def isExclusive(lockType: LockType): Boolean = lockType match { + case LockType.Exclusive => true + case LockType.Shared => false + } + + /** @inheritdoc */ + override def acquireLock(resourceName: String, lockType: LockType): Lock = { + val response = sendRequestAndWaitForResponse( + Api.AcquireLockRequest( + resourceName, + isExclusive(lockType), + returnImmediately = false + ) + ) + response match { + case Api.LockAcquired(lockId) => + WrappedConnectedLock(lockId) + case Api.LockAcquireFailed(errorMessage) => + throw new LockOperationFailed(errorMessage) + case unexpected => + throw new LockOperationFailed(s"Unexpected response [$unexpected].") + } + } + + /** @inheritdoc */ + override def tryAcquireLock( + resourceName: String, + lockType: LockType + ): Option[Lock] = { + val response = sendRequestAndWaitForResponse( + Api.AcquireLockRequest( + resourceName, + isExclusive(lockType), + returnImmediately = true + ) + ) + response match { + case Api.LockAcquired(lockId) => + Some(WrappedConnectedLock(lockId)) + case Api.CannotAcquireImmediately() => + None + case Api.LockAcquireFailed(errorMessage) => + throw new LockOperationFailed(errorMessage) + case unexpected => + throw new LockOperationFailed(s"Unexpected response [$unexpected].") + } + } + + private def sendRequestAndWaitForResponse( + request: Runtime.ApiRequest + ): Runtime.ApiResponse = { + val future = getEndpoint.sendRequest(request) + Await.result(future, Duration.Inf) + } + + private case class WrappedConnectedLock(lockId: UUID) extends Lock { + override def release(): Unit = sendRequestAndWaitForResponse( + Runtime.Api.ReleaseLockRequest(lockId) + ) match { + case Api.LockReleased() => + case Api.LockReleaseFailed(errorMessage) => + throw new LockOperationFailed(errorMessage) + case unexpected => + throw new LockOperationFailed(s"Unexpected response [$unexpected].") + } + } + + /** Indicates that the lock operation has failed due to some internal errors. + */ + class LockOperationFailed(message: String) extends RuntimeException(message) +} diff --git a/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerConnectionEndpoint.scala b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerConnectionEndpoint.scala new file mode 100644 index 000000000000..07247fb6a579 --- /dev/null +++ b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerConnectionEndpoint.scala @@ -0,0 +1,16 @@ +package org.enso.lockmanager.client + +import org.enso.polyglot.runtime.Runtime.{ApiRequest, ApiResponse} + +import scala.concurrent.Future + +/** A connection with the Language Server which provides a simple Future-based + * interface for sending requests and receiving corresponding responses. + */ +trait RuntimeServerConnectionEndpoint { + + /** Sends a request to the language server and returns a Future that is + * completed once a correlated response has been received. + */ + def sendRequest(msg: ApiRequest): Future[ApiResponse] +} diff --git a/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerRequestHandler.scala b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerRequestHandler.scala new file mode 100644 index 000000000000..cf5d8810de03 --- /dev/null +++ b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/client/RuntimeServerRequestHandler.scala @@ -0,0 +1,71 @@ +package org.enso.lockmanager.client +import com.typesafe.scalalogging.Logger +import org.enso.polyglot.runtime.Runtime.{Api, ApiRequest, ApiResponse} + +import java.util.UUID +import scala.concurrent.{Future, Promise} +import scala.util.Success + +/** Encapsulates the logic of keeping track of sent requests and completing + * their futures with corresponding replies to implement the + * [[RuntimeServerConnectionEndpoint]] interface. + */ +abstract class RuntimeServerRequestHandler + extends RuntimeServerConnectionEndpoint { + + /** Keeps requests that are currently in-flight, mapping their identifiers to + * their corresponding promises. + */ + private val knownRequests + : collection.concurrent.Map[UUID, Promise[ApiResponse]] = + collection.concurrent.TrieMap.empty + + private lazy val logger = Logger[this.type] + + /** @inheritdoc */ + override def sendRequest( + msg: ApiRequest + ): Future[ApiResponse] = { + val promise = Promise[ApiResponse]() + val uuid = UUID.randomUUID() + registerPromise(uuid, promise) + val request = Api.Request(uuid, msg) + sendToClient(request) + promise.future + } + + /** Registers a promise associated with a request id. */ + private def registerPromise( + requestId: UUID, + promise: Promise[ApiResponse] + ): Unit = knownRequests.put(requestId, promise) + + /** The method that must be overridden by the user which defines how the + * requests are actually sent to the Language Server. + */ + def sendToClient(request: Api.Request): Unit + + /** The method that must be called by the user when any response is received + * from the language server. + * + * It checks the [[knownRequests]] and completes the corresponding promise + * with the response. + */ + def onResponseReceived(response: Api.Response): Unit = response match { + case Api.Response(None, payload) => + logger.warn( + s"Received a notification [$payload], but passing notifications from " + + s"the Language Server to the Runtime is currently not supported." + ) + + case Api.Response(Some(correlationId), payload) => + knownRequests.remove(correlationId) match { + case Some(promise) => + promise.complete(Success(payload)) + case None => + logger.warn( + s"Received a response to an unknown request: [$correlationId]." + ) + } + } +} diff --git a/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/server/LockManagerService.scala b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/server/LockManagerService.scala new file mode 100644 index 000000000000..4bc38cff8974 --- /dev/null +++ b/lib/scala/connected-lock-manager/src/main/scala/org/enso/lockmanager/server/LockManagerService.scala @@ -0,0 +1,155 @@ +package org.enso.lockmanager.server + +import akka.actor.{Actor, ActorRef, Props} +import akka.pattern.pipe +import org.enso.distribution.locking.{Lock, LockType, ThreadSafeLockManager} +import org.enso.lockmanager.server.LockManagerService.InternalLockAcquired +import org.enso.polyglot.runtime.Runtime +import org.enso.polyglot.runtime.Runtime.Api.{ + AcquireLockRequest, + ReleaseLockRequest +} + +import java.util.UUID +import java.util.concurrent.Executors +import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal + +/** A service that wraps an underlying [[ThreadSafeLockManager]] instance and + * exposes its interface to any ConnectedLockManager instances which can + * connect to this service using the runtime connection. + * + * @param underlyingLockManager the lock manager to wrap + */ +class LockManagerService(underlyingLockManager: ThreadSafeLockManager) + extends Actor { + + /** Internal state of the actor, keeping currently held locks identified by + * unique random identifiers. + */ + case class State(locks: Map[UUID, Lock]) { + def addLock(lock: Lock): (State, UUID) = { + val id = UUID.randomUUID() + if (locks.contains(id)) addLock(lock) + else (copy(locks = locks.updated(id, lock)), id) + } + + def removeLock(id: UUID): Option[(State, Lock)] = { + val newState = copy(locks = locks.removed(id)) + locks.get(id).map((newState, _)) + } + } + + implicit private val ec: ExecutionContext = + ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) + + override def receive: Receive = mainStage(State(Map.empty)) + + private def mainStage(state: State): Receive = { + case Runtime.Api.Request( + requestId, + AcquireLockRequest(resourceName, exclusive, returnImmediately) + ) => + val lockType = if (exclusive) LockType.Exclusive else LockType.Shared + if (returnImmediately) { + val response = acquireLockImmediately(state, resourceName, lockType) + sender() ! Runtime.Api.Response(requestId, response) + } else { + val replyTo = sender() + Future(underlyingLockManager.acquireLock(resourceName, lockType)) + .map(Right(_)) + .recover(Left(_)) + .map(InternalLockAcquired(requestId, replyTo, _)) pipeTo self + } + + case InternalLockAcquired(requestId, replyTo, result) => + result match { + case Left(exception) => + replyTo ! Runtime.Api.Response( + requestId, + Runtime.Api.LockAcquireFailed( + s"${exception.toString}: ${exception.getMessage}" + ) + ) + case Right(lock) => + val (newState, lockId) = state.addLock(lock) + context.become(mainStage(newState)) + replyTo ! Runtime.Api.Response( + requestId, + Runtime.Api.LockAcquired(lockId) + ) + } + + case Runtime.Api.Request(requestId, ReleaseLockRequest(lockId)) => + val response = releaseLock(state, lockId) + sender() ! Runtime.Api.Response(requestId, response) + } + + private def releaseLock( + state: State, + lockId: UUID + ): Runtime.ApiResponse = state.removeLock(lockId) match { + case Some((newState, lock)) => + try { + lock.release() + context.become(mainStage(newState)) + Runtime.Api.LockReleased() + } catch { + case NonFatal(error) => + Runtime.Api.LockReleaseFailed( + s"${error.toString}: ${error.getMessage}" + ) + } + case None => + Runtime.Api.LockReleaseFailed(s"Lock with id [$lockId] is not known.") + } + + private def acquireLockImmediately( + state: State, + resourceName: String, + lockType: LockType + ): Runtime.ApiResponse = try { + underlyingLockManager.tryAcquireLock(resourceName, lockType) match { + case Some(lock) => + val (newState, lockId) = state.addLock(lock) + context.become(mainStage(newState)) + Runtime.Api.LockAcquired(lockId) + case None => + Runtime.Api.CannotAcquireImmediately() + } + } catch { + case NonFatal(exception) => + Runtime.Api.LockAcquireFailed( + s"${exception.toString}: ${exception.getMessage}" + ) + } +} + +object LockManagerService { + + /** Creates a configuration object to create [[LockManagerService]]. + * + * @param underlyingLockManager the lock manager to wrap + */ + def props(underlyingLockManager: ThreadSafeLockManager): Props = Props( + new LockManagerService(underlyingLockManager) + ) + + /** Indicates the types of requests from the runtime that this service + * handles. + */ + def handledRequestTypes: Seq[Class[_ <: Runtime.ApiRequest]] = Seq( + classOf[Runtime.Api.AcquireLockRequest], + classOf[Runtime.Api.ReleaseLockRequest] + ) + + /** An internal message used to pass the result of a Future waiting for a lock + * to be acquired back to the actor to update the state and send the + * response. + */ + case class InternalLockAcquired( + requestId: Option[Runtime.Api.RequestId], + replyTo: ActorRef, + result: Either[Throwable, Lock] + ) +} diff --git a/lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ActorToHandlerConnector.scala b/lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ActorToHandlerConnector.scala new file mode 100644 index 000000000000..13eaed64c3c3 --- /dev/null +++ b/lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ActorToHandlerConnector.scala @@ -0,0 +1,44 @@ +package org.enso.lockmanager + +import akka.actor.{Actor, Props, Stash} +import org.enso.lockmanager.ActorToHandlerConnector.SetRequestHandler +import org.enso.lockmanager.client.RuntimeServerRequestHandler +import org.enso.polyglot.runtime.Runtime.Api + +/** A helper Actor that forwards messages sent to it to a request handler. + * + * It is used so that the messages sent from the endpoint can have some actor + * set as their sender, so that the lock manager service has a destination to + * reply to. It then forwards any received messages to the endpoint. + */ +class ActorToHandlerConnector extends Actor with Stash { + + override def receive: Receive = initializingStage + + private def initializedStage( + requestHandler: RuntimeServerRequestHandler + ): Receive = { case response: Api.Response => + requestHandler.onResponseReceived(response) + } + + private def initializingStage: Receive = { + case SetRequestHandler(handler) => + context.become(initializedStage(handler)) + case _ => + stash() + } + +} + +object ActorToHandlerConnector { + def props(): Props = Props( + new ActorToHandlerConnector + ) + + /** A message that must be sent to the [[ActorToHandlerConnector]] to + * initialize it. + */ + case class SetRequestHandler( + runtimeServerRequestHandler: RuntimeServerRequestHandler + ) +} diff --git a/lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ConnectedLockManagerTest.scala b/lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ConnectedLockManagerTest.scala new file mode 100644 index 000000000000..197a977cc4aa --- /dev/null +++ b/lib/scala/connected-lock-manager/src/test/scala/org/enso/lockmanager/ConnectedLockManagerTest.scala @@ -0,0 +1,154 @@ +package org.enso.lockmanager + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import org.enso.distribution.locking.{ + LockManager, + LockType, + ThreadSafeFileLockManager +} +import org.enso.lockmanager.ActorToHandlerConnector.SetRequestHandler +import org.enso.lockmanager.client.{ + ConnectedLockManager, + RuntimeServerRequestHandler +} +import org.enso.lockmanager.server.LockManagerService +import org.enso.polyglot.runtime.Runtime.Api +import org.enso.testkit.{TestSynchronizer, WithTemporaryDirectory} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.util.Using + +class ConnectedLockManagerTest + extends TestKit(ActorSystem("TestSystem")) + with AnyWordSpecLike + with Matchers + with WithTemporaryDirectory { + + private def lockRoot = getTestDirectory.resolve("locks") + + def setupLockManagers(): (LockManager, LockManager) = { + val primaryLockManager = new ThreadSafeFileLockManager(lockRoot) + + val lockManagerService = + system.actorOf(LockManagerService.props(primaryLockManager)) + + val connector = system.actorOf(ActorToHandlerConnector.props()) + val endpoint = new RuntimeServerRequestHandler { + override def sendToClient(request: Api.Request): Unit = + lockManagerService.tell(request, sender = connector) + } + + connector ! SetRequestHandler(endpoint) + + val connectedLockManager = new ConnectedLockManager + connectedLockManager.connect(endpoint) + + (primaryLockManager, connectedLockManager) + } + + private val resource = "test-resource" + + "ConnectedLockManager" should { + "share exclusive locks between the connected lock manager and the primary one" in { + val sync = new TestSynchronizer + + val (primary, connected) = setupLockManagers() + + sync.startThread("primary") { + Using( + primary.acquireLockWithWaitingAction( + resource, + LockType.Exclusive, + () => + throw new RuntimeException( + "First locker should not have to wait!" + ) + ) + ) { _ => + sync.signal("primary-acquired") + sync.report("primary-acquired") + sync.waitFor("connected-is-waiting") + sync.report("primary-releasing") + } + } + + sync.startThread("connected") { + sync.waitFor("primary-acquired") + Using( + connected.acquireLockWithWaitingAction( + resource, + LockType.Exclusive, + () => { + sync.report("connected-waiting") + sync.signal("connected-is-waiting") + } + ) + ) { _ => + sync.report("connected-acquired") + } + } + + sync.join() + sync.summarizeReports() shouldEqual Seq( + "primary-acquired", + "connected-waiting", + "primary-releasing", + "connected-acquired" + ) + } + + "correctly handle shared locks" in { + val sync = new TestSynchronizer + + val (primary, connected) = setupLockManagers() + + sync.startThread("primary") { + Using( + primary.acquireLockWithWaitingAction( + resource, + LockType.Shared, + () => + throw new RuntimeException( + "First locker should not have to wait!" + ) + ) + ) { _ => + sync.signal("primary-acquired") + sync.report("primary-acquired") + sync.waitFor("connected-acquired") + } + + sync.report("released") + } + + sync.startThread("connected") { + sync.waitFor("primary-acquired") + Using( + connected.acquireLockWithWaitingAction( + resource, + LockType.Shared, + () => + throw new RuntimeException( + "Second locker should not have to wait!" + ) + ) + ) { _ => + sync.report("connected-acquired") + sync.signal("connected-acquired") + } + + sync.report("released") + } + + sync.join() + sync.summarizeReports() shouldEqual Seq( + "primary-acquired", + "connected-acquired", + "released", + "released" + ) + } + } +} diff --git a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/LanguageHome.scala b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/LanguageHome.scala index 5f64972cbb02..ea09e1f4c210 100644 --- a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/LanguageHome.scala +++ b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/LanguageHome.scala @@ -4,6 +4,9 @@ import java.nio.file.Path /** A helper class that provides paths for bundled components inside of an * engine distribution from which the runtime is being run. + * + * @param languageHome the path to the directory containing the runner.jar and + * runtime.jar of the currently running language runtime */ case class LanguageHome(languageHome: Path) { private val rootPath = languageHome.getParent.toAbsolutePath.normalize @@ -25,7 +28,8 @@ object LanguageHome { /** Finds the [[LanguageHome]] based on the path of the runner JAR. * - * Only guaranteed to work properly if used in a component that is started by the `engine-runner`. + * Only guaranteed to work properly if used in a component that is started by + * the `engine-runner`. */ def detectFromExecutableLocation(environment: Environment): LanguageHome = { val homePath = environment.getPathToRunningExecutable.getParent diff --git a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ResourceManager.scala b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ResourceManager.scala index 7234f7cefc02..88a670223edf 100644 --- a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ResourceManager.scala +++ b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ResourceManager.scala @@ -62,67 +62,6 @@ class ResourceManager(lockManager: LockManager) { lock } - var mainLock: Option[Lock] = None - - /** Initializes the [[MainLock]]. */ - def initializeMainLock(): Unit = { - val lock = - lockManager - .tryAcquireLock(MainLock.name, LockType.Shared) - .getOrElse { - throw DistributionIsModifiedError(MainLock.waitMessage) - } - mainLock = Some(lock) - } - - /** Exception that is thrown when the main lock is held exclusively. - * - * This situation means that the current distribution is being installed or - * uninstalled, so it should not be used in the meantime and the application - * has to terminate immediately. - */ - case class DistributionIsModifiedError(message: String) - extends RuntimeException(message) - - /** Acquires an exclusive main lock (first releasing the shared lock), - * ensuring that no other processes using this distribution can be running in - * parallel. - * - * @param waitAction function that is executed if the lock cannot be acquired - * immediately - */ - def acquireExclusiveMainLock(waitAction: () => Unit): Unit = { - mainLock match { - case Some(oldLock) => - oldLock.release() - mainLock = None - case None => - } - - val lock = lockManager.acquireLockWithWaitingAction( - MainLock.name, - LockType.Exclusive, - waitAction - ) - mainLock = Some(lock) - } - - /** Releases the main lock. - * - * Should be called just before the program terminates. It is not an error to - * skip it, as the operating system should unlock all resources after the - * program terminates, but on some platforms this automatic 'garbage - * collection for locks' may take some time, so it is better to release it - * manually. - */ - def releaseMainLock(): Unit = - mainLock match { - case Some(lock) => - lock.release() - mainLock = None - case None => - } - /** Runs the provided `action` if an exclusive lock can be immediately * acquired for the temporary directory, i.e. the directory is not used by * anyone. @@ -190,16 +129,4 @@ class ResourceManager(lockManager: LockManager) { "the installation has to wait until that is complete to avoid " + "conflicts. It should not take a long time." } - - /** The main lock that is held by all launcher processes. - * - * It is used to ensure that no other processes are running when the - * distribution is being installed or uninstalled. - */ - private case object MainLock extends Resource { - override def name: String = "launcher-main" - override def waitMessage: String = - "Another process is installing or uninstalling the current " + - "distribution. Please wait until that finishes." - } } diff --git a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeFileLockManager.scala b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeFileLockManager.scala index ca8b4b1c7072..e2e50886a879 100644 --- a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeFileLockManager.scala +++ b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeFileLockManager.scala @@ -33,7 +33,7 @@ import java.nio.file.Path * * @param locksRoot the directory in which lockfiles should be kept */ -class ThreadSafeFileLockManager(locksRoot: Path) extends LockManager { +class ThreadSafeFileLockManager(locksRoot: Path) extends ThreadSafeLockManager { val fileLockManager = new FileLockManager(locksRoot) val localLocks = collection.concurrent.TrieMap.empty[String, ThreadSafeLock] diff --git a/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeLockManager.scala b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeLockManager.scala new file mode 100644 index 000000000000..9aaee3d83e80 --- /dev/null +++ b/lib/scala/distribution-manager/src/main/scala/org/enso/distribution/locking/ThreadSafeLockManager.scala @@ -0,0 +1,4 @@ +package org.enso.distribution.locking + +/** A [[LockManager]] which guarantees to be thread-safe. */ +trait ThreadSafeLockManager extends LockManager diff --git a/lib/scala/json-rpc-server-test/src/main/scala/org/enso/jsonrpc/test/JsonRpcServerTestKit.scala b/lib/scala/json-rpc-server-test/src/main/scala/org/enso/jsonrpc/test/JsonRpcServerTestKit.scala index f2d62e2e760b..62b12446d90c 100644 --- a/lib/scala/json-rpc-server-test/src/main/scala/org/enso/jsonrpc/test/JsonRpcServerTestKit.scala +++ b/lib/scala/json-rpc-server-test/src/main/scala/org/enso/jsonrpc/test/JsonRpcServerTestKit.scala @@ -52,7 +52,7 @@ abstract class JsonRpcServerTestKit def clientControllerFactory: ClientControllerFactory override def beforeEach(): Unit = { - + super.beforeEach() server = new JsonRpcServer(protocol, clientControllerFactory) binding = Await.result(server.bind(interface, port = 0), 3.seconds) address = s"ws://$interface:${binding.localAddress.getPort}" @@ -60,6 +60,7 @@ abstract class JsonRpcServerTestKit override def afterEach(): Unit = { val _ = binding.unbind() + super.afterEach() } class WsTestClient(address: String, debugMessages: Boolean = false) { diff --git a/lib/scala/library-manager-test/src/main/scala/org/enso/librarymanager/published/repository/DownloaderTest.scala b/lib/scala/library-manager-test/src/main/scala/org/enso/librarymanager/published/repository/DownloaderTest.scala index 252dd46b1662..18994b618648 100644 --- a/lib/scala/library-manager-test/src/main/scala/org/enso/librarymanager/published/repository/DownloaderTest.scala +++ b/lib/scala/library-manager-test/src/main/scala/org/enso/librarymanager/published/repository/DownloaderTest.scala @@ -41,7 +41,6 @@ trait DownloaderTest { self: HasTestDirectory => action(cache) } finally { - resourceManager.releaseMainLock() resourceManager.unlockTemporaryDirectory() } } diff --git a/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/DefaultLibraryProvider.scala b/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/DefaultLibraryProvider.scala index 6582d26f59f7..e0343bc21921 100644 --- a/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/DefaultLibraryProvider.scala +++ b/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/DefaultLibraryProvider.scala @@ -72,7 +72,7 @@ class DefaultLibraryProvider private ( .map(ResolvedLibrary(libraryName, version, _)) .toEither .left - .map(ResolvingLibraryProvider.Error.DownloadFailed) + .map(ResolvingLibraryProvider.Error.DownloadFailed(version, _)) } } } diff --git a/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/ResolvingLibraryProvider.scala b/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/ResolvingLibraryProvider.scala index fe32a81b4749..90391bb717f4 100644 --- a/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/ResolvingLibraryProvider.scala +++ b/lib/scala/library-manager/src/main/scala/org/enso/librarymanager/ResolvingLibraryProvider.scala @@ -1,6 +1,6 @@ package org.enso.librarymanager -import org.enso.editions.LibraryName +import org.enso.editions.{LibraryName, LibraryVersion} /** A helper class for resolving libraries. */ trait ResolvingLibraryProvider { @@ -37,6 +37,9 @@ object ResolvingLibraryProvider { /** Indicates that the library version was missing and had to be downloaded, * but the download has failed. */ - case class DownloadFailed(reason: Throwable) extends Error + case class DownloadFailed( + version: LibraryVersion.Published, + reason: Throwable + ) extends Error } } diff --git a/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala b/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala index 3d9ba7a03a51..9d0070066aaf 100644 --- a/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala +++ b/lib/scala/project-manager/src/test/scala/org/enso/projectmanager/protocol/ProjectManagementApiSpec.scala @@ -1,15 +1,17 @@ package org.enso.projectmanager.protocol +import akka.testkit.TestDuration import io.circe.literal._ import nl.gn0s1s.bump.SemVer import org.apache.commons.io.FileUtils import org.enso.editions.SemVerJson._ import org.enso.projectmanager.{BaseServerSpec, ProjectManagementOps} import org.enso.testkit.{FlakySpec, RetrySpec} + import java.io.File import java.nio.file.{Files, Paths} import java.util.UUID - +import scala.concurrent.duration._ import scala.io.Source class ProjectManagementApiSpec @@ -761,7 +763,7 @@ class ProjectManagementApiSpec deleteProject(bazId) } - "return a list of projects even if editions of some of them cannot be resolved" in { + "return a list of projects even if editions of some of them cannot be resolved" taggedAs Retry in { implicit val client = new WsTestClient(address) //given val fooId = createProject("Foo") @@ -781,7 +783,8 @@ class ProjectManagementApiSpec } """) //then - client.expectJson(json""" + client.expectJson( + json""" { "jsonrpc":"2.0", "id":0, @@ -804,7 +807,9 @@ class ProjectManagementApiSpec ] } } - """) + """, + timeout = 10.seconds.dilated + ) deleteProject(fooId) deleteProject(barId) } diff --git a/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/SlowTestSynchronizer.scala b/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/SlowTestSynchronizer.scala new file mode 100644 index 000000000000..b3ca61b277ec --- /dev/null +++ b/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/SlowTestSynchronizer.scala @@ -0,0 +1,7 @@ +package org.enso.runtimeversionmanager.test + +import org.enso.testkit.TestSynchronizer + +class SlowTestSynchronizer extends TestSynchronizer { + override val timeOutSeconds: Long = 90 +} diff --git a/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestLocalLockManager.scala b/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestLocalLockManager.scala index 2f34a2398f5e..d5c17b289ee6 100644 --- a/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestLocalLockManager.scala +++ b/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestLocalLockManager.scala @@ -1,6 +1,6 @@ package org.enso.runtimeversionmanager.test -import org.enso.distribution.locking.{Lock, LockManager, LockType} +import org.enso.distribution.locking.{Lock, LockType, ThreadSafeLockManager} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.{ @@ -9,7 +9,7 @@ import java.util.concurrent.locks.{ Lock => JLock } -/** A [[LockManager]] that creates process-local locks. +/** A [[ThreadSafeLockManager]] that creates process-local locks. * * The locks are not visible by other processes, so this manager is not useful * for synchronizing multiple processes. It can be used to test concurrency @@ -19,10 +19,9 @@ import java.util.concurrent.locks.{ * or the thread is interrupted. To aid with testing, this implementation times * out after 30 seconds. */ -class TestLocalLockManager extends LockManager { +class TestLocalLockManager extends ThreadSafeLockManager { - /** @inheritdoc - */ + /** @inheritdoc */ override def acquireLock(resourceName: String, lockType: LockType): Lock = { val lock = getLock(resourceName, lockType) val locked = lock.tryLock(30, TimeUnit.SECONDS) @@ -34,8 +33,7 @@ class TestLocalLockManager extends LockManager { WrapLock(lock) } - /** @inheritdoc - */ + /** @inheritdoc */ override def tryAcquireLock( resourceName: String, lockType: LockType diff --git a/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestableThreadSafeFileLockManager.scala b/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestableThreadSafeFileLockManager.scala new file mode 100644 index 000000000000..4cbfff84bd32 --- /dev/null +++ b/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestableThreadSafeFileLockManager.scala @@ -0,0 +1,27 @@ +package org.enso.runtimeversionmanager.test + +import org.enso.distribution.locking.ThreadSafeFileLockManager + +import java.nio.file.Path + +class TestableThreadSafeFileLockManager(locksRoot: Path) + extends ThreadSafeFileLockManager(locksRoot) { + + /** A helper function that can be called by the test suite to release all file locks. + * + * It is only safe to call this function if it can be guaranteed that no + * locks created with this manager are still in scope. + * + * Normally all file locks are released automatically when the JVM exits, + * but as tests run within a single JVM, this is not the case and any + * dangling locks will cause problems when cleaning the temporary + * directory. + */ + def releaseAllLocks(): Unit = { + localLocks.foreach { case (_, lock) => + lock.fileLock.foreach(_.release()) + lock.fileLock = None + } + localLocks.clear() + } +} diff --git a/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ConcurrencyTest.scala b/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ConcurrencyTest.scala index 65d19ecf1876..26b02c972258 100644 --- a/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ConcurrencyTest.scala +++ b/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ConcurrencyTest.scala @@ -1,21 +1,13 @@ package org.enso.distribution.locking -import java.nio.file.{Files, Path} import nl.gn0s1s.bump.SemVer import org.enso.cli.task.TaskProgress +import org.enso.distribution.FileSystem.PathSyntax import org.enso.distribution.{ DistributionManager, FileSystem, TemporaryDirectoryManager } -import org.enso.distribution.locking.{ - LockManager, - LockType, - LockUserInterface, - Resource, - ResourceManager -} -import org.enso.distribution.FileSystem.PathSyntax import org.enso.runtimeversionmanager.components.{ GraalVMComponentConfiguration, GraalVMVersion, @@ -38,6 +30,7 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.scalatest.wordspec.AnyWordSpec +import java.nio.file.{Files, Path} import scala.util.Try class ConcurrencyTest @@ -73,10 +66,6 @@ class ConcurrencyTest } } - class SlowTestSynchronizer extends TestSynchronizer { - override val timeOutSeconds: Long = 90 - } - var testLocalLockManager: Option[LockManager] = None override def beforeEach(): Unit = { @@ -371,72 +360,5 @@ class ConcurrencyTest "t2-uninstalled" ) } - - "synchronize main lock" taggedAs Retry in { - - /** First two threads start and acquire the shared lock, than the third - * thread tries to acquire an exclusive lock (in practice that will be our - * (un)installer), it should wait for the other threads to finish. When - * the threads see that it started waiting (the waiting notification is - * normally used to tell the user what the application is waiting for), - * the two threads finish and after that the third one is able to acquire - * the exclusive lock. - */ - val sync = new SlowTestSynchronizer - sync.startThread("t1") { - val resourceManager = makeNewResourceManager() - resourceManager.initializeMainLock() - sync.report("shared-start") - sync.signal("started-1") - sync.waitFor("finish-1") - sync.report("shared-end") - resourceManager.releaseMainLock() - } - - sync.startThread("t2") { - val resourceManager = makeNewResourceManager() - resourceManager.initializeMainLock() - sync.report("shared-start") - sync.signal("started-2") - sync.waitFor("finish-2") - sync.report("shared-end") - resourceManager.releaseMainLock() - } - - sync.waitFor("started-1") - sync.waitFor("started-2") - - sync.startThread("t3") { - val resourceManager = makeNewResourceManager() - resourceManager.initializeMainLock() - sync.report("t3-start") - resourceManager.acquireExclusiveMainLock(() => { - sync.report("t3-wait") - sync.signal("waiting") - }) - sync.report("t3-end") - sync.signal("finish-all") - resourceManager.releaseMainLock() - } - - sync.waitFor("waiting") - Thread.sleep(1000) - - sync.signal("finish-1") - sync.signal("finish-2") - - sync.waitFor("finish-all") - - sync.join() - sync.summarizeReports() shouldEqual Seq( - "shared-start", - "shared-start", - "t3-start", - "t3-wait", - "shared-end", - "shared-end", - "t3-end" - ) - } } } diff --git a/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ThreadSafeFileLockManagerTest.scala b/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ThreadSafeFileLockManagerTest.scala index 9703ecda60f9..a70daaa8a32d 100644 --- a/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ThreadSafeFileLockManagerTest.scala +++ b/lib/scala/runtime-version-manager-test/src/test/scala/org/enso/distribution/locking/ThreadSafeFileLockManagerTest.scala @@ -1,15 +1,11 @@ package org.enso.distribution.locking import org.enso.distribution.locking.LockType.Shared -import org.enso.distribution.locking.{ - FileLockManager, - LockType, - ThreadSafeFileLockManager +import org.enso.runtimeversionmanager.test.{ + NativeTestHelper, + TestableThreadSafeFileLockManager } - -import java.nio.file.Path -import org.enso.runtimeversionmanager.test.{NativeTestHelper, TestSynchronizer} -import org.enso.testkit.WithTemporaryDirectory +import org.enso.testkit.{TestSynchronizer, WithTemporaryDirectory} import org.scalatest.OptionValues import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.matchers.should.Matchers @@ -25,29 +21,7 @@ class ThreadSafeFileLockManagerTest with OptionValues with NativeTestHelper { - val timeLimit: Span = 30.seconds - - class TestableThreadSafeFileLockManager(locksRoot: Path) - extends ThreadSafeFileLockManager(locksRoot) { - - /** A helper function that can be called by the test suite to release all file locks. - * - * It is only safe to call this function if it can be guaranteed that no - * locks created with this manager are still in scope. - * - * Normally all file locks are released automatically when the JVM exits, - * but as tests run within a single JVM,this is not the case and any - * dangling locks will cause problems when cleaning the temporary - * directory. - */ - def releaseAllLocks(): Unit = { - localLocks.foreach { case (_, lock) => - lock.fileLock.foreach(_.release()) - lock.fileLock = None - } - localLocks.clear() - } - } + override val timeLimit: Span = 30.seconds private var testLocalLockManager: Option[TestableThreadSafeFileLockManager] = None diff --git a/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestSynchronizer.scala b/lib/scala/testkit/src/main/scala/org/enso/testkit/TestSynchronizer.scala similarity index 99% rename from lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestSynchronizer.scala rename to lib/scala/testkit/src/main/scala/org/enso/testkit/TestSynchronizer.scala index a064f27ad7e6..a51d7a640e61 100644 --- a/lib/scala/runtime-version-manager-test/src/main/scala/org/enso/runtimeversionmanager/test/TestSynchronizer.scala +++ b/lib/scala/testkit/src/main/scala/org/enso/testkit/TestSynchronizer.scala @@ -1,9 +1,8 @@ -package org.enso.runtimeversionmanager.test - -import java.util.concurrent.{Semaphore, TimeUnit} +package org.enso.testkit import org.scalatest.exceptions.TestFailedException +import java.util.concurrent.{Semaphore, TimeUnit} import scala.jdk.CollectionConverters._ /** A helper class that can be used to synchronize actions between multiple diff --git a/lib/scala/testkit/src/main/scala/org/enso/testkit/WithTemporaryDirectory.scala b/lib/scala/testkit/src/main/scala/org/enso/testkit/WithTemporaryDirectory.scala index 0b5406502553..5f55a6cbe1e8 100644 --- a/lib/scala/testkit/src/main/scala/org/enso/testkit/WithTemporaryDirectory.scala +++ b/lib/scala/testkit/src/main/scala/org/enso/testkit/WithTemporaryDirectory.scala @@ -17,8 +17,8 @@ trait WithTemporaryDirectory /** @inheritdoc */ override def beforeEach(): Unit = { - super.beforeEach() prepareTemporaryDirectory() + super.beforeEach() } /** @inheritdoc diff --git a/tools/legal-review/engine/report-state b/tools/legal-review/engine/report-state index 1317bc4edfa8..a19b4936b455 100644 --- a/tools/legal-review/engine/report-state +++ b/tools/legal-review/engine/report-state @@ -1,3 +1,3 @@ -74A285A175A34A2A23AC442A666097C0D8C0F8AD4592E80BB1B13F9B6A37B012 +903EC863B0B6FC7E29593D03CE165701370052BED0F98D887E2950BEA128A91D 205455D488F2D8768D4F560EAC46A99C09D50625ECBE6ED4D2D92401752E132D 0