diff --git a/.gitignore b/.gitignore index 89276366..cbb46a65 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,8 @@ project/plugins/project/ .idea* .bsp .metals -.vscode \ No newline at end of file +.vscode +.bloop +metals.sbt + +/notes.md \ No newline at end of file diff --git a/build.sbt b/build.sbt index e0b75f83..97f920bb 100644 --- a/build.sbt +++ b/build.sbt @@ -24,22 +24,10 @@ compileDocumentation := { (documentation / mdoc).toTask(" --out target/ox-doc").value } -val useRequireIOPlugin = - // Based on: - // https://stackoverflow.com/questions/54660122/how-to-include-a-scala-compiler-plugin-from-a-local-path - Compile / scalacOptions ++= { - val jar = (plugin / Compile / Keys.`package`).value - System.setProperty("sbt.paths.plugin.jar", jar.getAbsolutePath) - - val addPlugin = "-Xplugin:" + jar.getAbsolutePath - val dummy = "-Jdummy=" + jar.lastModified - Seq(addPlugin, dummy) - } - lazy val rootProject = (project in file(".")) .settings(commonSettings) .settings(publishArtifact := false, name := "ox") - .aggregate(core, plugin, pluginTest, examples, kafka, mdcLogback) + .aggregate(core, examples, kafka, mdcLogback) lazy val core: Project = (project in file("core")) .settings(commonSettings) @@ -49,48 +37,6 @@ lazy val core: Project = (project in file("core")) "com.softwaremill.jox" % "channels" % "0.3.1", scalaTest ), - // Check IO usage in core - useRequireIOPlugin, - Test / fork := true - ) - -lazy val plugin: Project = (project in file("plugin")) - .settings(commonSettings) - .settings( - name := "plugin", - libraryDependencies ++= Seq("org.scala-lang" %% "scala3-compiler" % scalaVersion.value, scalaTest) - ) - -lazy val pluginTest: Project = (project in file("plugin-test")) - .settings(commonSettings) - .settings( - name := "plugin-test", - libraryDependencies ++= Seq( - "org.scala-lang" %% "scala3-compiler" % scalaVersion.value, - scalaTest - ), - publishArtifact := false, - // Playground testing - useRequireIOPlugin, - // Unit testing, based on: - // https://github.com/xebia-functional/munit-compiler-toolkit/ - Test / javaOptions += { - val dependencyJars = (Compile / dependencyClasspath).value.files - val thisJar = (Compile / Keys.`package`).value - val `scala-compiler-classpath` = - (dependencyJars :+ thisJar) - .map(_.toPath.toAbsolutePath.toString()) - .mkString(":") - s"-Dscala-compiler-classpath=${`scala-compiler-classpath`}" - }, - Test / javaOptions += Def.taskDyn { - Def.task { - val _ = (plugin / Compile / Keys.`package`).value - val `scala-compiler-options` = - s"${(plugin / Compile / packageBin).value}" - s"""-Dscala-compiler-plugin=${`scala-compiler-options`}""" - } - }.value, Test / fork := true ) @@ -118,8 +64,7 @@ lazy val kafka: Project = (project in file("kafka")) "org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % Test, "org.apache.pekko" %% "pekko-stream" % "1.1.1" % Test, scalaTest - ), - useRequireIOPlugin + ) ) .dependsOn(core) diff --git a/core/src/main/scala/ox/IO.scala b/core/src/main/scala/ox/IO.scala deleted file mode 100644 index b63238f9..00000000 --- a/core/src/main/scala/ox/IO.scala +++ /dev/null @@ -1,36 +0,0 @@ -package ox - -/** Represents a capability to perform I/O operations. The capability should be part of the signature of any method, which either performs - * I/O operations, or calls other methods which perform I/O operations. For example: - * - * {{{ - * def readFromFile(path: String)(using IO): String = ... - * def writeToFile(path: String, content: String)(using IO): Unit = ... - * def transform(path: String)(f: String => String)(using IO): Unit = - * writeToFile(path, f(readFromFile(path))) - * }}} - * - * The capability can be introduced using [[IO.unsafe]] or by `import ox.IO.globalForTesting.given`. - * - * Take care not to capture the capability e.g. using constructors (unless you are sure such usage is safe), as this might circumvent the - * tracking of I/O operations. Similarly, the capability might be captured by lambdas, which might later be used when the IO capability is - * not in scope. In future Scala and Ox releases, these problems should be detected at compile-time using the upcoming capture checker. - */ -class IO private[ox] () - -object IO: - private val IOInstance = new IO - - /** Grants the [[IO]] capability when executing the given block of code. Ideally should **only** be used: - * - at the edges of your application (e.g. in the `main` method) - * - when integrating with third-party libraries - * - * In tests, use [[globalForTesting]]. - */ - inline def unsafe[T](f: IO ?=> T): T = f(using IOInstance) - - /** When imported using `import ox.IO.globalForTesting.given`, grants the [[IO]] capability globally within the scope of the import. - * Designed to be used in tests. - */ - object globalForTesting: - given IO = IOInstance diff --git a/core/src/main/scala/ox/OxApp.scala b/core/src/main/scala/ox/OxApp.scala index 092fcea1..caef65a9 100644 --- a/core/src/main/scala/ox/OxApp.scala +++ b/core/src/main/scala/ox/OxApp.scala @@ -24,10 +24,8 @@ enum ExitCode(val code: Int): * * Certain aspects of exception handling can be configured using [[OxApp.Settings]] and overriding the `settings` method. * - * The application's code is specified in a `run` method, which has two capabilities granted: - * - * - [[Ox]], to fork asynchronously computations, and register clean up of resources - * - [[IO]], to perform I/O operations + * The application's code is specified in a `run` method, which has the [[Ox]] capability granted: to fork asynchronously computations, and + * register clean up of resources */ trait OxApp: protected def settings: OxApp.Settings = OxApp.Settings.Default @@ -36,7 +34,7 @@ trait OxApp: try unsupervised { val cancellableMainFork = forkCancellable { - try supervised(IO.unsafe(run(args.toVector))) + try supervised(run(args.toVector)) catch case NonFatal(e) => settings.handleException(e) @@ -67,7 +65,7 @@ trait OxApp: try Runtime.getRuntime.addShutdownHook(thread) catch case _: IllegalStateException => () - def run(args: Vector[String])(using Ox, IO): ExitCode + def run(args: Vector[String])(using Ox): ExitCode end OxApp object OxApp: @@ -107,11 +105,11 @@ object OxApp: /** Simple variant of OxApp does not pass command line arguments and exits with exit code 0 if no exceptions were thrown. */ trait Simple extends OxApp: - override final def run(args: Vector[String])(using Ox, IO): ExitCode = + override final def run(args: Vector[String])(using Ox): ExitCode = run ExitCode.Success - def run(using Ox, IO): Unit + def run(using Ox): Unit /** WithErrorMode variant of OxApp allows to specify what kind of error handling for the main function should be used. Base trait for * integrations. @@ -122,7 +120,7 @@ object OxApp: * wrapper type for given ErrorMode */ trait WithErrorMode[E, F[_]](em: ErrorMode[E, F]) extends OxApp: - override final def run(args: Vector[String])(using Ox, IO): ExitCode = + override final def run(args: Vector[String])(using Ox): ExitCode = val result = runWithErrors(args) if em.isError(result) then handleError(em.getError(result)) else ExitCode.Success @@ -133,7 +131,7 @@ object OxApp: /** This template method is to be implemented by abstract classes that add integration for particular error handling data structure of * type F[_]. */ - def runWithErrors(args: Vector[String])(using Ox, IO): F[ExitCode] + def runWithErrors(args: Vector[String])(using Ox): F[ExitCode] end WithErrorMode /** WithEitherErrors variant of OxApp integrates OxApp with an `either` block and allows for usage of `.ok()` combinators in the body of @@ -145,8 +143,8 @@ object OxApp: abstract class WithEitherErrors[E] extends WithErrorMode(EitherMode[E]()): type EitherError[Err] = Label[Either[Err, ExitCode]] - override final def runWithErrors(args: Vector[String])(using Ox, IO): Either[E, ExitCode] = + override final def runWithErrors(args: Vector[String])(using Ox): Either[E, ExitCode] = either[E, ExitCode](run(args)) - def run(args: Vector[String])(using Ox, EitherError[E], IO): ExitCode + def run(args: Vector[String])(using Ox, EitherError[E]): ExitCode end OxApp diff --git a/core/src/main/scala/ox/channels/SourceCompanionIOOps.scala b/core/src/main/scala/ox/channels/SourceCompanionIOOps.scala index f931f873..4d67eef8 100644 --- a/core/src/main/scala/ox/channels/SourceCompanionIOOps.scala +++ b/core/src/main/scala/ox/channels/SourceCompanionIOOps.scala @@ -22,7 +22,7 @@ trait SourceCompanionIOOps: * @return * a `Source` of chunks of bytes. */ - def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] = + def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox, StageCapacity): Source[Chunk[Byte]] = val chunks = StageCapacity.newChannel[Chunk[Byte]] forkPropagate(chunks) { try @@ -54,7 +54,7 @@ trait SourceCompanionIOOps: * @throws SecurityException * If SecurityManager error occurs when opening the file. */ - def fromFile(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] = + def fromFile(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity): Source[Chunk[Byte]] = if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory") val chunks = StageCapacity.newChannel[Chunk[Byte]] val jFileChannel = diff --git a/core/src/main/scala/ox/channels/SourceIOOps.scala b/core/src/main/scala/ox/channels/SourceIOOps.scala index e66f8d71..24fb90e8 100644 --- a/core/src/main/scala/ox/channels/SourceIOOps.scala +++ b/core/src/main/scala/ox/channels/SourceIOOps.scala @@ -39,7 +39,7 @@ trait SourceIOOps[+T]: * @throws IOException * if an error occurs when writing or closing of the `OutputStream`. */ - def toOutputStream(outputStream: OutputStream)(using T <:< Chunk[Byte], IO): Unit = + def toOutputStream(outputStream: OutputStream)(using T <:< Chunk[Byte]): Unit = repeatWhile { outer.receiveOrClosed() match case ChannelClosed.Done => @@ -65,7 +65,7 @@ trait SourceIOOps[+T]: * @throws IOException * if an error occurs when opening the file or during the write process. */ - def toFile(path: Path)(using T <:< Chunk[Byte], IO): Unit = + def toFile(path: Path)(using T <:< Chunk[Byte]): Unit = if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory") val jFileChannel = try { @@ -93,7 +93,7 @@ trait SourceIOOps[+T]: throw e } - private inline def close(closeable: Closeable, cause: Option[Throwable] = None)(using IO): Unit = + private inline def close(closeable: Closeable, cause: Option[Throwable] = None): Unit = try closeable.close() catch case NonFatal(closeException) => diff --git a/core/src/main/scala/ox/race.scala b/core/src/main/scala/ox/race.scala index bc3b5cdb..695fb7c5 100644 --- a/core/src/main/scala/ox/race.scala +++ b/core/src/main/scala/ox/race.scala @@ -5,7 +5,7 @@ import scala.annotation.tailrec import scala.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration import scala.util.control.{ControlThrowable, NonFatal} -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success} /** A `Some` if the computation `t` took less than `duration`, and `None` otherwise. if the computation `t` throws an exception, it is * propagated. diff --git a/core/src/main/scala/ox/scheduling/scheduled.scala b/core/src/main/scala/ox/scheduling/scheduled.scala index 645c2164..df4162c2 100644 --- a/core/src/main/scala/ox/scheduling/scheduled.scala +++ b/core/src/main/scala/ox/scheduling/scheduled.scala @@ -3,7 +3,7 @@ package ox.scheduling import ox.{EitherMode, ErrorMode, sleep} import scala.annotation.tailrec -import scala.concurrent.duration.{Duration, FiniteDuration, DurationLong} +import scala.concurrent.duration.{FiniteDuration, DurationLong} import scala.util.Try /** The mode that specifies how to interpret the duration provided by the schedule. */ diff --git a/core/src/test/scala/ox/OxAppTest.scala b/core/src/test/scala/ox/OxAppTest.scala index 2c5d4e46..528904cc 100644 --- a/core/src/test/scala/ox/OxAppTest.scala +++ b/core/src/test/scala/ox/OxAppTest.scala @@ -18,7 +18,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: override def exit(exitCode: ExitCode): Unit = ec = exitCode.code - override def run(args: Vector[String])(using Ox, IO): ExitCode = Success + override def run(args: Vector[String])(using Ox): ExitCode = Success Main10.main(Array.empty) @@ -43,7 +43,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: override private[ox] def exit(exitCode: ExitCode): Unit = ec = exitCode.code - override def run(args: Vector[String])(using Ox, IO): ExitCode = + override def run(args: Vector[String])(using Ox): ExitCode = forever: // will never finish sleep(10.millis) @@ -64,7 +64,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: var stackTrace = "" object Main30 extends OxApp: - override def run(args: Vector[String])(using Ox, IO): ExitCode = + override def run(args: Vector[String])(using Ox): ExitCode = Failure(23) override private[ox] def exit(exitCode: ExitCode): Unit = @@ -79,7 +79,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: // failure by throwing an exception object Main31 extends OxApp: - override def run(args: Vector[String])(using Ox, IO): ExitCode = + override def run(args: Vector[String])(using Ox): ExitCode = throw Exception("oh no") override private[ox] def exit(exitCode: ExitCode): Unit = @@ -100,7 +100,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: // failure by throwing an exception in a user fork object Main32 extends OxApp: - override def run(args: Vector[String])(using Ox, IO): ExitCode = + override def run(args: Vector[String])(using Ox): ExitCode = forkUser(throw Exception("oh no")) Success @@ -144,7 +144,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: OxApp.Settings.defaultHandleInterruptedException(t => handledExceptions = t :: handledExceptions) ) - override def run(args: Vector[String])(using Ox, IO): ExitCode = + override def run(args: Vector[String])(using Ox): ExitCode = releaseAfterScope: throw new Exception("bye!") @@ -166,7 +166,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: override def exit(exitCode: ExitCode): Unit = ec = exitCode.code - override def run(using Ox, IO): Unit = () + override def run(using Ox): Unit = () Main50.main(Array.empty) @@ -190,7 +190,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: override def exit(exitCode: ExitCode): Unit = ec = exitCode.code - override def run(using Ox, IO): Unit = + override def run(using Ox): Unit = forever: sleep(10.millis) @@ -207,7 +207,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: var stackTrace = "" object Main70 extends OxApp.Simple: - override def run(using Ox, IO): Unit = throw Exception("oh no") + override def run(using Ox): Unit = throw Exception("oh no") override private[ox] def exit(exitCode: ExitCode): Unit = ec = exitCode.code @@ -239,7 +239,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: override def handleError(e: FunException): ExitCode = Failure(e.code) - override def run(args: Vector[String])(using Ox, EitherError[FunException], IO): ExitCode = + override def run(args: Vector[String])(using Ox, EitherError[FunException]): ExitCode = errOrEc.ok() Main80.main(Array.empty) @@ -267,7 +267,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: override private[ox] def exit(exitCode: ExitCode): Unit = ec = exitCode.code - override def run(args: Vector[String])(using Ox, EitherError[FunException], IO): ExitCode = + override def run(args: Vector[String])(using Ox, EitherError[FunException]): ExitCode = forever: // will never finish sleep(10.millis) @@ -287,7 +287,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: var stackTrace = "" object Main100 extends OxApp.WithEitherErrors[FunException]: - override def run(args: Vector[String])(using Ox, EitherError[FunException], IO): ExitCode = + override def run(args: Vector[String])(using Ox, EitherError[FunException]): ExitCode = errOrEc.ok() override private[ox] def exit(exitCode: ExitCode): Unit = @@ -302,7 +302,7 @@ class OxAppTest extends AnyFlatSpec with Matchers: ec = Int.MinValue object Main101 extends OxApp.WithEitherErrors[FunException]: - override def run(args: Vector[String])(using Ox, EitherError[FunException], IO): ExitCode = + override def run(args: Vector[String])(using Ox, EitherError[FunException]): ExitCode = throw Exception("oh no") override private[ox] def exit(exitCode: ExitCode): Unit = diff --git a/core/src/test/scala/ox/channels/SourceCompanionIOOpsTest.scala b/core/src/test/scala/ox/channels/SourceCompanionIOOpsTest.scala index 926215ab..8f2e002b 100644 --- a/core/src/test/scala/ox/channels/SourceCompanionIOOpsTest.scala +++ b/core/src/test/scala/ox/channels/SourceCompanionIOOpsTest.scala @@ -14,8 +14,6 @@ import java.nio.file.Paths import java.io.IOException class SourceCompanionIOOpsTest extends AnyWordSpec with Matchers: - import ox.IO.globalForTesting.given - def emptyInputStream: TestInputStream = new TestInputStream("") def inputStream(text: String, failing: Boolean = false): TestInputStream = new TestInputStream(text, failing) @@ -82,7 +80,7 @@ class SourceCompanionIOOpsTest extends AnyWordSpec with Matchers: private def toStrings(source: Source[Chunk[Byte]]): List[String] = source.toList.map(_.asStringUtf8) -class TestInputStream(text: String, throwOnRead: Boolean = false)(using IO) extends ByteArrayInputStream(text.getBytes): +class TestInputStream(text: String, throwOnRead: Boolean = false) extends ByteArrayInputStream(text.getBytes): val closed: AtomicBoolean = new AtomicBoolean(false) override def close(): Unit = diff --git a/core/src/test/scala/ox/channels/SourceIOOpsTest.scala b/core/src/test/scala/ox/channels/SourceIOOpsTest.scala index a3c84023..9cd52358 100644 --- a/core/src/test/scala/ox/channels/SourceIOOpsTest.scala +++ b/core/src/test/scala/ox/channels/SourceIOOpsTest.scala @@ -15,8 +15,6 @@ import java.nio.file.Paths import java.util.concurrent.atomic.AtomicBoolean class SourceIOOpsTest extends AnyWordSpec with Matchers: - import ox.IO.globalForTesting.given - def inputStreamToString(is: InputStream)(using Ox): String = { val source = useInScope(scala.io.Source.fromInputStream(is))(_.close()) source.mkString @@ -171,7 +169,7 @@ class SourceIOOpsTest extends AnyWordSpec with Matchers: private def fileContent(path: Path)(using Ox): List[String] = Source.fromFile(path).toList.map(_.asStringUtf8) -class TestOutputStream(throwOnWrite: Boolean = false)(using IO) extends ByteArrayOutputStream: +class TestOutputStream(throwOnWrite: Boolean = false) extends ByteArrayOutputStream: val closed: AtomicBoolean = new AtomicBoolean(false) override def close(): Unit = diff --git a/core/src/test/scala/ox/channels/SourceTextOpsTest.scala b/core/src/test/scala/ox/channels/SourceTextOpsTest.scala index 4042a095..972721a3 100644 --- a/core/src/test/scala/ox/channels/SourceTextOpsTest.scala +++ b/core/src/test/scala/ox/channels/SourceTextOpsTest.scala @@ -6,8 +6,6 @@ import org.scalatest.wordspec.AnyWordSpec import java.nio.charset.Charset class SourceTextOpsTest extends AnyWordSpec with Matchers { - import ox.IO.globalForTesting.given - "source.linesUtf8" should { "split a single chunk of bytes into lines" in supervised { diff --git a/doc/channels/io.md b/doc/channels/io.md index 17d0cbea..a7387a86 100644 --- a/doc/channels/io.md +++ b/doc/channels/io.md @@ -2,8 +2,6 @@ Ox allows creating a `Source` which reads from a file or `InpuStream`, as well as directing an existing source into a file or an `OutputStream`. These methods work only with a `Source[Chunk[Byte]]`. Ox takes care of closing files/streams after processing and on errors. -All I/O operations require the [IO capability](../io.md). - ## InputStream and OutputStream ### Source.fromInputStream @@ -12,18 +10,17 @@ An `InputStream` can be converted to a `Source[Chunk[Byte]]`: ```scala mdoc:compile-only import ox.channels.Source -import ox.{IO, supervised} +import ox.supervised import java.io.ByteArrayInputStream import java.io.InputStream val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) supervised { - IO.unsafe: - Source - .fromInputStream(inputStream) // Source[Chunk[Byte]] - .decodeStringUtf8 - .map(_.toUpperCase) - .foreach(println) // "SOME INPUT" + Source + .fromInputStream(inputStream) // Source[Chunk[Byte]] + .decodeStringUtf8 + .map(_.toUpperCase) + .foreach(println) // "SOME INPUT" } ``` @@ -32,18 +29,17 @@ You can define a custom chunk size instead of using the default: ```scala mdoc:compile-only import ox.channels.Source -import ox.{IO, supervised} +import ox.supervised import java.io.ByteArrayInputStream import java.io.InputStream val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) supervised { - IO.unsafe: - Source - .fromInputStream(inputStream, chunkSize = 4) // Source[Chunk[Byte]] - .decodeStringUtf8 - .map(_.toUpperCase) - .foreach(println) // "SOME", " INPUT" + Source + .fromInputStream(inputStream, chunkSize = 4) // Source[Chunk[Byte]] + .decodeStringUtf8 + .map(_.toUpperCase) + .foreach(println) // "SOME", " INPUT" } ``` @@ -53,16 +49,15 @@ A `Source[Chunk[Byte]]` can be directed to write to an `OutputStream`: ```scala mdoc:compile-only import ox.channels.Source -import ox.{IO, supervised} +import ox.supervised import java.io.ByteArrayOutputStream val outputStream = new ByteArrayOutputStream() supervised { val source = Source.fromIterable(List("text1,", "text2")) - IO.unsafe: - source - .encodeUtf8 - .toOutputStream(outputStream) + source + .encodeUtf8 + .toOutputStream(outputStream) } outputStream.toString // "TEXT1,TEXT2" ``` @@ -75,16 +70,15 @@ You can obtain a `Source` of byte chunks read from a file for a given path: ```scala mdoc:compile-only import ox.channels.Source -import ox.{IO, supervised} +import ox.supervised import java.nio.file.Paths supervised { - IO.unsafe: - Source - .fromFile(Paths.get("/path/to/my/file.txt")) - .linesUtf8 - .map(_.toUpperCase) - .toList // List("FILE_LINE1", "FILE_LINE2") + Source + .fromFile(Paths.get("/path/to/my/file.txt")) + .linesUtf8 + .map(_.toUpperCase) + .toList // List("FILE_LINE1", "FILE_LINE2") } ``` @@ -97,14 +91,13 @@ A `Source[Chunk[Byte]]` can be written to a file under a given path: ```scala mdoc:compile-only import ox.channels.Source -import ox.{IO, supervised} +import ox.supervised import java.nio.file.Paths supervised { val source = Source.fromIterable(List("text1,", "text2")) - IO.unsafe: - source - .encodeUtf8 - .toFile(Paths.get("/path/to/my/target/file.txt")) + source + .encodeUtf8 + .toFile(Paths.get("/path/to/my/target/file.txt")) } ``` diff --git a/doc/index.md b/doc/index.md index 8a55eb53..694d1632 100644 --- a/doc/index.md +++ b/doc/index.md @@ -45,10 +45,9 @@ In addition to this documentation, ScalaDocs can be browsed at [https://javadoc. .. toctree:: :maxdepth: 2 - :caption: Resiliency, I/O & utilities + :caption: Resiliency & utilities oxapp - io retries repeat scheduled diff --git a/doc/io.md b/doc/io.md deleted file mode 100644 index 62a9365a..00000000 --- a/doc/io.md +++ /dev/null @@ -1,140 +0,0 @@ -# I/O - -Ox includes the `IO` capability, which is designed to be part of the signature of any method, which performs I/O -either directly or indirectly. The goal is for method signatures to be truthful, and specify the possible side effects, -failure modes and timing in a reasonably precise and practical way. For example: - -```scala mdoc:compile-only -import ox.IO - -def readFromFile(path: String)(using IO): String = ??? -def writeToFile(path: String, content: String)(using IO): Unit = ??? -def transform(path: String)(f: String => String)(using IO): Unit = - writeToFile(path, f(readFromFile(path))) -``` - -In other words, the presence of a `using IO` parameter indicates that the method might: - -* have side effects: write to a file, send a network request, read from a database, etc. -* take a non-trivial amount of time to complete due to blocking, data transfer, etc. -* throw an exception (unless the exceptions are handled, and e.g. transformed into - [application errors](basics/error-handling.md)) - -Quite importantly, the **absence** of `using IO` specifies that the method has **no** I/O side effects (however, it -might still block the thread, e.g. when using a channel, or have other side effects, such as throwing exceptions, -accessing the current time, or generating a random number). Compiler assists in checking this property, but only to a -certain degree - it's possible to cheat! - -The `IO` capability can be introduced using `IO.unsafe`. Ideally, this method should only be used at the edges of your -application (e.g. in the `main` method), or when integrating with third-party libraries. Otherwise, the capability -should be passed as an implicit parameter. Such an ideal scenario might not possible, but there's still value in `IO` -tracking: by looking up the usages of `IO.unsafe` it's possible to quickly find the "roots" where the `IO` capability -is introduced. For example: - -```scala -import ox.IO - -def sendHTTPRequest(body: String)(using IO): String = ??? - -@main def run(): Unit = - IO.unsafe: - sendHTTPRequest("Hello, world!") -``` - -For testing purposes, instead of using `IO.unsafe`, there's a special import which grants the capability within the -scope of the import. By having different mechanisms for introducing `IO` in production and test code, test usages don't -pollute the search results, when verifying `IO.unsafe` usages (which should be as limited as possible). For example: - -```scala mdoc:compile-only -import ox.IO -import ox.IO.globalForTesting.given - -def myMethod()(using IO): Unit = ??? - -def testMyMethod(): Unit = myMethod() -``` - -Take care not to capture the capability e.g. using constructors (unless you are sure such usage is safe), as this might -circumvent the tracking of I/O operations. Similarly, the capability might be captured by lambdas, which might later be -used when the IO capability is not in scope. In future Scala and Ox releases, these problems should be detected at -compile-time using the upcoming capture checker. - -## The requireIO compiler plugin - -Ox provides a compiler plugin, which verifies at compile-time that the `IO` capability is present when invoking any -methods from the JDK or Java libraries that specify to throw an IO-related exception, such as `java.io.IOException`. - -To use the plugin, add the following settings to your sbt configuration: - -```scala -autoCompilerPlugins := true -addCompilerPlugin("com.softwaremill.ox" %% "plugin" % "@VERSION@") -``` - -For scala-cli: - -```scala -//> using plugin com.softwaremill.ox:::plugin:@VERSION@ -``` - -With the plugin enabled, the following code won't compile: - -```scala -import java.io.InputStream - -object Test: - def test(): Unit = - val is: InputStream = ??? - is.read() - -/* -[error] -- Error: Test.scala:8:11 -[error] 8 | is.read() -[error] | ^^^^^^^^^ -[error] |The `java.io.InputStream.read` method throws an `java.io.IOException`, -[error] |but the `ox.IO` capability is not available in the implicit scope. -[error] | -[error] |Try adding a `using IO` clause to the enclosing method. - */ -``` - -You can think of the plugin as a way to translate between the effect system that is part of Java - checked exceptions - -and the `IO` effect specified by Ox. Note that only usages of Java methods which have the proper `throws` clauses will -be checked (or of Scala methods, which have the `@throws` annotation). - -```{note} -If you are using a Scala library that uses Java's I/O under the covers, such usages can't (and won't) be -checked by the plugin. The scope of the plugin is currently limited to the JDK and Java libraries only. -``` - -### Other I/O exceptions - -In some cases, libraries wrap I/O exceptions in their own types. It's possible to configure the plugin to require the -`IO` capability for such exceptions as well. In order to do so, you need to pass the fully qualified names of these -exceptions as a compiler plugin option, each class in a separate option. For example, in sbt: - -```scala -Compile / scalacOptions += "-P:requireIO:com.example.MyIOException" -``` - -In a scala-cli directive: - -```bash -//> using option -P:requireIO:com.example.MyIOException -``` - -Currently, by default the plugin checks for the following exceptions: - -* `java.io.IOException` -* `java.sql.SQLException` - -## Potential benefits of tracking methods that perform I/O - -Tracking which methods perform I/O using the `IO` capability has the only benefit of giving you method signatures, -which carry more information. In other words: more type safety. The specific benefits might include: - -* better code readability (what does this method do? -* local reasoning (does this method perform I/O?) -* safer refactoring (adding I/O to a previously pure method triggers errors in the compiler, you need to consciously add the capability) -* documentation through types (an IO method can take a longer time, have side-effects) -* possible failure modes (an IO method might throw an exception) diff --git a/doc/kafka.md b/doc/kafka.md index 210b3c95..7936fb28 100644 --- a/doc/kafka.md +++ b/doc/kafka.md @@ -11,24 +11,21 @@ the `KafkaSource`, `KafkaStage` and `KafkaDrain` objects. In all cases either a `KafkaProducer` / `KafkaConsumer` is needed, or `ProducerSettings` / `ConsumerSetttings` need to be provided with the bootstrap servers, consumer group id, key / value serializers, etc. -All Kafka I/O operations require the [IO capability](io.md). - To read from a Kafka topic, use: ```scala mdoc:compile-only import ox.channels.ChannelClosed import ox.kafka.{ConsumerSettings, KafkaSource, ReceivedMessage} import ox.kafka.ConsumerSettings.AutoOffsetReset -import ox.{IO, supervised} +import ox.supervised supervised { val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest) val topic = "my_topic" - IO.unsafe: - val source = KafkaSource.subscribe(settings, topic) - - source.receive(): ReceivedMessage[String, String] | ChannelClosed + val source = KafkaSource.subscribe(settings, topic) + + source.receive(): ReceivedMessage[String, String] | ChannelClosed } ``` @@ -37,16 +34,15 @@ To publish data to a Kafka topic: ```scala mdoc:compile-only import ox.channels.Source import ox.kafka.{ProducerSettings, KafkaDrain} -import ox.{IO, pipe, supervised} +import ox.{pipe, supervised} import org.apache.kafka.clients.producer.ProducerRecord supervised { val settings = ProducerSettings.default.bootstrapServers("localhost:9092") - IO.unsafe: - Source - .fromIterable(List("a", "b", "c")) - .mapAsView(msg => ProducerRecord[String, String]("my_topic", msg)) - .pipe(KafkaDrain.publish(settings)) + Source + .fromIterable(List("a", "b", "c")) + .mapAsView(msg => ProducerRecord[String, String]("my_topic", msg)) + .pipe(KafkaDrain.publish(settings)) } ``` @@ -71,7 +67,7 @@ computed. For example: ```scala mdoc:compile-only import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaSource, ProducerSettings, SendPacket} import ox.kafka.ConsumerSettings.AutoOffsetReset -import ox.{IO, pipe, supervised} +import ox.{pipe, supervised} import org.apache.kafka.clients.producer.ProducerRecord supervised { @@ -80,12 +76,11 @@ supervised { val sourceTopic = "source_topic" val destTopic = "dest_topic" - IO.unsafe: - KafkaSource - .subscribe(consumerSettings, sourceTopic) - .map(in => (in.value.toLong * 2, in)) - .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) - .pipe(KafkaDrain.publishAndCommit(producerSettings)) + KafkaSource + .subscribe(consumerSettings, sourceTopic) + .map(in => (in.value.toLong * 2, in)) + .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original)) + .pipe(KafkaDrain.publishAndCommit(producerSettings)) } ``` @@ -97,17 +92,16 @@ To publish data as a mapping stage: import ox.channels.Source import ox.kafka.ProducerSettings import ox.kafka.KafkaStage.* -import ox.{IO, supervised} +import ox.supervised import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} supervised { val settings = ProducerSettings.default.bootstrapServers("localhost:9092") - IO.unsafe: - val metadatas: Source[RecordMetadata] = Source - .fromIterable(List("a", "b", "c")) - .mapAsView(msg => ProducerRecord[String, String]("my_topic", msg)) - .mapPublish(settings) - - // process the metadatas source further + val metadatas: Source[RecordMetadata] = Source + .fromIterable(List("a", "b", "c")) + .mapAsView(msg => ProducerRecord[String, String]("my_topic", msg)) + .mapPublish(settings) + + // process the metadatas source further } ``` diff --git a/doc/oxapp.md b/doc/oxapp.md index 6ae96335..4384085c 100644 --- a/doc/oxapp.md +++ b/doc/oxapp.md @@ -1,8 +1,8 @@ # OxApp To properly handle application interruption and clean shutdown, Ox provides a way to define application entry points -using `OxApp` trait. The application's main `run` function is then executed on a virtual thread, with a root `Ox` and -`IO` capabilities provided. +using `OxApp` trait. The application's main `run` function is then executed on a virtual thread, with a root `Ox` +capability provided. Here's an example: @@ -11,7 +11,7 @@ import ox.* import scala.concurrent.duration.* object MyApp extends OxApp: - def run(args: Vector[String])(using Ox, IO): ExitCode = + def run(args: Vector[String])(using Ox): ExitCode = forkUser { sleep(500.millis) println("Fork finished!") @@ -30,14 +30,14 @@ In the code below, the resource is released when the application is interrupted: import ox.* object MyApp extends OxApp: - def run(args: Vector[String])(using Ox, IO): ExitCode = + def run(args: Vector[String])(using Ox): ExitCode = releaseAfterScope: println("Releasing ...") println("Waiting ...") never ``` -The `run` function receives command line arguments as a `Vector` of `String`s, a given `Ox` and `IO` capabilities and +The `run` function receives command line arguments as a `Vector` of `String`s, a given `Ox` capability and has to return an `ox.ExitCode` value which translates to the exit code returned from the program. `ox.ExitCode` is defined as: @@ -48,14 +48,14 @@ enum ExitCode(val code: Int): ``` There's also a simplified variant of `OxApp` for situations where you don't care about command line arguments. -The `run` function doesn't take any arguments beyond the root `Ox` and `IO` capabilities, expects no `ExitCode` and will +The `run` function doesn't take any arguments beyond the root `Ox` capability, expects no `ExitCode` and will handle any exceptions thrown by printing a stack trace and returning an exit code of `1`: ```scala mdoc:compile-only import ox.* object MyApp extends OxApp.Simple: - def run(using Ox, IO): Unit = println("All done!") + def run(using Ox): Unit = println("All done!") ``` `OxApp` has also a variant that integrates with [either](basics/error-handling.md#boundary-break-for-eithers) @@ -80,7 +80,7 @@ object MyApp extends OxApp.WithEitherErrors[MyAppError]: case ComputationError(_) => ExitCode.Failure(23) } - def run(args: Vector[String])(using Ox, EitherError[MyAppError], IO): ExitCode = + def run(args: Vector[String])(using Ox, EitherError[MyAppError]): ExitCode = doWork().ok() // will end the scope with MyAppError as `doWork` returns a Left ExitCode.Success ``` @@ -106,7 +106,7 @@ object MyApp extends OxApp: interruptedExitCode = ExitCode.Failure(130) ) - def run(args: Vector[String])(using Ox, IO): ExitCode = + def run(args: Vector[String])(using Ox): ExitCode = sleep(60.seconds) ExitCode.Success ``` diff --git a/kafka/src/main/scala/ox/kafka/KafkaConsumerWrapper.scala b/kafka/src/main/scala/ox/kafka/KafkaConsumerWrapper.scala index 2f8bef98..cf4521ea 100644 --- a/kafka/src/main/scala/ox/kafka/KafkaConsumerWrapper.scala +++ b/kafka/src/main/scala/ox/kafka/KafkaConsumerWrapper.scala @@ -9,30 +9,30 @@ import ox.channels.* import scala.jdk.CollectionConverters.* trait KafkaConsumerWrapper[K, V]: - def subscribe(topics: Seq[String])(using IO): Unit - def poll()(using IO): ConsumerRecords[K, V] - def commit(offsets: Map[TopicPartition, Long])(using IO): Unit + def subscribe(topics: Seq[String]): Unit + def poll(): ConsumerRecords[K, V] + def commit(offsets: Map[TopicPartition, Long]): Unit object KafkaConsumerWrapper: private val logger = LoggerFactory.getLogger(classOf[KafkaConsumerWrapper.type]) def apply[K, V](consumer: KafkaConsumer[K, V], closeWhenComplete: Boolean)(using Ox): ActorRef[KafkaConsumerWrapper[K, V]] = val logic = new KafkaConsumerWrapper[K, V]: - override def subscribe(topics: Seq[String])(using IO): Unit = + override def subscribe(topics: Seq[String]): Unit = try consumer.subscribe(topics.asJava) catch case t: Throwable => logger.error(s"Exception when subscribing to $topics", t) throw t - override def poll()(using IO): ConsumerRecords[K, V] = + override def poll(): ConsumerRecords[K, V] = try consumer.poll(java.time.Duration.ofMillis(100)) catch case t: Throwable => logger.error("Exception when polling for records in Kafka", t) throw t - override def commit(offsets: Map[TopicPartition, Long])(using IO): Unit = + override def commit(offsets: Map[TopicPartition, Long]): Unit = try consumer.commitSync(offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap.asJava) catch case t: Throwable => diff --git a/kafka/src/main/scala/ox/kafka/KafkaDrain.scala b/kafka/src/main/scala/ox/kafka/KafkaDrain.scala index 31344144..bcb27555 100644 --- a/kafka/src/main/scala/ox/kafka/KafkaDrain.scala +++ b/kafka/src/main/scala/ox/kafka/KafkaDrain.scala @@ -10,10 +10,10 @@ import scala.jdk.CollectionConverters.* object KafkaDrain: private val logger = LoggerFactory.getLogger(classOf[KafkaDrain.type]) - def publish[K, V](settings: ProducerSettings[K, V])(using IO): Source[ProducerRecord[K, V]] => Unit = source => + def publish[K, V](settings: ProducerSettings[K, V]): Source[ProducerRecord[K, V]] => Unit = source => publish(settings.toProducer, closeWhenComplete = true)(source) - def publish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using IO): Source[ProducerRecord[K, V]] => Unit = source => + def publish[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean): Source[ProducerRecord[K, V]] => Unit = source => // if sending multiple records ends in an exception, we'll receive at most one anyway; we don't want to block the // producers, hence creating an unbounded channel val producerExceptions = Channel.unlimited[Throwable] @@ -42,7 +42,7 @@ object KafkaDrain: * A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are * sent. Then, all `commit` messages (consumer records) up to their offsets are committed. */ - def publishAndCommit[K, V](producerSettings: ProducerSettings[K, V]): Source[SendPacket[K, V]] => IO ?=> Unit = + def publishAndCommit[K, V](producerSettings: ProducerSettings[K, V]): Source[SendPacket[K, V]] => Unit = source => publishAndCommit(producerSettings.toProducer, closeWhenComplete = true)(source) /** @param producer @@ -51,7 +51,7 @@ object KafkaDrain: * A drain, which consumes all packets from the provided `Source`.. For each packet, first all `send` messages (producer records) are * sent. Then, all `commit` messages (consumer records) up to their offsets are committed. */ - def publishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean): Source[SendPacket[K, V]] => IO ?=> Unit = source => + def publishAndCommit[K, V](producer: KafkaProducer[K, V], closeWhenComplete: Boolean): Source[SendPacket[K, V]] => Unit = source => supervised { import KafkaStage.* source.mapPublishAndCommit(producer, closeWhenComplete).drain() diff --git a/kafka/src/main/scala/ox/kafka/KafkaSource.scala b/kafka/src/main/scala/ox/kafka/KafkaSource.scala index d58dc050..876bf397 100644 --- a/kafka/src/main/scala/ox/kafka/KafkaSource.scala +++ b/kafka/src/main/scala/ox/kafka/KafkaSource.scala @@ -10,21 +10,18 @@ object KafkaSource: def subscribe[K, V](settings: ConsumerSettings[K, V], topic: String, otherTopics: String*)(using StageCapacity, - Ox, - IO + Ox ): Source[ReceivedMessage[K, V]] = subscribe(settings.toConsumer, closeWhenComplete = true, topic, otherTopics: _*) def subscribe[K, V](kafkaConsumer: KafkaConsumer[K, V], closeWhenComplete: Boolean, topic: String, otherTopics: String*)(using StageCapacity, - Ox, - IO + Ox ): Source[ReceivedMessage[K, V]] = subscribe(KafkaConsumerWrapper(kafkaConsumer, closeWhenComplete), topic, otherTopics: _*) def subscribe[K, V](kafkaConsumer: ActorRef[KafkaConsumerWrapper[K, V]], topic: String, otherTopics: String*)(using StageCapacity, - Ox, - IO + Ox ): Source[ReceivedMessage[K, V]] = kafkaConsumer.tell(_.subscribe(topic :: otherTopics.toList)) diff --git a/kafka/src/main/scala/ox/kafka/KafkaStage.scala b/kafka/src/main/scala/ox/kafka/KafkaStage.scala index 8e76c5b8..3f26bd74 100644 --- a/kafka/src/main/scala/ox/kafka/KafkaStage.scala +++ b/kafka/src/main/scala/ox/kafka/KafkaStage.scala @@ -19,7 +19,7 @@ object KafkaStage: * @return * A stream of published records metadata, in the order in which the [[ProducerRecord]]s are received. */ - def mapPublish(settings: ProducerSettings[K, V])(using StageCapacity, Ox, IO): Source[RecordMetadata] = + def mapPublish(settings: ProducerSettings[K, V])(using StageCapacity, Ox): Source[RecordMetadata] = mapPublish(settings.toProducer, closeWhenComplete = true) /** Publish the messages using the given `producer`. The producer is closed depending on the `closeWhenComplete` flag, after all @@ -28,7 +28,7 @@ object KafkaStage: * @return * A stream of published records metadata, in the order in which the [[ProducerRecord]]s are received. */ - def mapPublish(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox, IO): Source[RecordMetadata] = + def mapPublish(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] = source.mapAsView(r => SendPacket(List(r), Nil)).mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = false) extension [K, V](source: Source[SendPacket[K, V]]) @@ -39,7 +39,7 @@ object KafkaStage: * @return * A stream of published records metadata, in the order in which the [[SendPacket]]s are received. */ - def mapPublishAndCommit(producerSettings: ProducerSettings[K, V])(using StageCapacity, Ox, IO): Source[RecordMetadata] = + def mapPublishAndCommit(producerSettings: ProducerSettings[K, V])(using StageCapacity, Ox): Source[RecordMetadata] = mapPublishAndCommit(producerSettings.toProducer, closeWhenComplete = true) /** For each packet, first all messages (producer records) are sent, using the given `producer`. Then, all messages from @@ -54,15 +54,13 @@ object KafkaStage: */ def mapPublishAndCommit(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, - Ox, - IO + Ox ): Source[RecordMetadata] = mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = true) private def mapPublishAndCommit(producer: KafkaProducer[K, V], closeWhenComplete: Boolean, commitOffsets: Boolean)(using StageCapacity, - Ox, - IO + Ox ): Source[RecordMetadata] = // source - the upstream from which packets are received @@ -130,7 +128,7 @@ object KafkaStage: exceptions: Sink[Exception], metadata: Sink[(Long, RecordMetadata)], commitOffsets: Boolean - )(using IO): Unit = + ): Unit = val leftToSend = new AtomicInteger(packet.send.size) packet.send.foreach { toSend => val sequenceNo = sendInSequence.nextSequenceNo diff --git a/kafka/src/main/scala/ox/kafka/kafkaOffsetCommit.scala b/kafka/src/main/scala/ox/kafka/kafkaOffsetCommit.scala index 92cac30c..14eaf4f4 100644 --- a/kafka/src/main/scala/ox/kafka/kafkaOffsetCommit.scala +++ b/kafka/src/main/scala/ox/kafka/kafkaOffsetCommit.scala @@ -8,7 +8,7 @@ import ox.channels.* import scala.collection.mutable import scala.concurrent.duration.* -private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox, IO): Unit = +private[kafka] def doCommit(packets: Source[SendPacket[_, _]])(using Ox): Unit = val commitInterval = 1.second val ticks = Source.tick(commitInterval) val toCommit = mutable.Map[TopicPartition, Long]() diff --git a/kafka/src/test/scala/ox/kafka/KafkaTest.scala b/kafka/src/test/scala/ox/kafka/KafkaTest.scala index cfc58a2e..c6e6ea5e 100644 --- a/kafka/src/test/scala/ox/kafka/KafkaTest.scala +++ b/kafka/src/test/scala/ox/kafka/KafkaTest.scala @@ -13,8 +13,6 @@ import ox.* import scala.concurrent.duration.* class KafkaTest extends AnyFlatSpec with Matchers with EmbeddedKafka with BeforeAndAfterAll { - import ox.IO.globalForTesting.given - private var bootstrapServer: String = _ override def beforeAll(): Unit = diff --git a/kafka/src/test/scala/ox/kafka/manual/publish.scala b/kafka/src/test/scala/ox/kafka/manual/publish.scala index 83a41689..e073c1bf 100644 --- a/kafka/src/test/scala/ox/kafka/manual/publish.scala +++ b/kafka/src/test/scala/ox/kafka/manual/publish.scala @@ -14,13 +14,12 @@ import ox.kafka.* val bootstrapServer = "localhost:29092" val settings = ProducerSettings.default.bootstrapServers(bootstrapServer) - IO.unsafe: - Source - .unfold(())(_ => Some((randomString(), ()))) - // 100 bytes * 10000000 = 1 GB - .take(10_000_000) - .mapAsView(msg => ProducerRecord[String, String](topic, msg)) - .mapPublish(settings) - .drain() + Source + .unfold(())(_ => Some((randomString(), ()))) + // 100 bytes * 10000000 = 1 GB + .take(10_000_000) + .mapAsView(msg => ProducerRecord[String, String](topic, msg)) + .mapPublish(settings) + .drain() } } diff --git a/kafka/src/test/scala/ox/kafka/manual/transfer.scala b/kafka/src/test/scala/ox/kafka/manual/transfer.scala index 146b9c28..39a0659f 100644 --- a/kafka/src/test/scala/ox/kafka/manual/transfer.scala +++ b/kafka/src/test/scala/ox/kafka/manual/transfer.scala @@ -18,13 +18,12 @@ import ox.kafka.ConsumerSettings.AutoOffsetReset val bootstrapServer = "localhost:29092" val consumerSettings = ConsumerSettings.default(group).bootstrapServers(bootstrapServer).autoOffsetReset(AutoOffsetReset.Earliest) val producerSettings = ProducerSettings.default.bootstrapServers(bootstrapServer) - IO.unsafe: - KafkaSource - .subscribe(consumerSettings, sourceTopic) - .take(10_000_000) - .map(in => (in.value.reverse, in)) - .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value), original)) - .mapPublishAndCommit(producerSettings) - .drain() + KafkaSource + .subscribe(consumerSettings, sourceTopic) + .take(10_000_000) + .map(in => (in.value.reverse, in)) + .map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value), original)) + .mapPublishAndCommit(producerSettings) + .drain() } } diff --git a/plugin-test/src/main/scala-3/ox/IO.scala b/plugin-test/src/main/scala-3/ox/IO.scala deleted file mode 100644 index 0456fc40..00000000 --- a/plugin-test/src/main/scala-3/ox/IO.scala +++ /dev/null @@ -1,8 +0,0 @@ -package ox - -// we don't depend on core, so we need to provide a "dummy" IO for tests - -trait IO - -object IO: - def unsafe[T](op: IO ?=> T): T = ??? diff --git a/plugin-test/src/test/scala-3/ox/plugin/test/RequireIOTest.scala b/plugin-test/src/test/scala-3/ox/plugin/test/RequireIOTest.scala deleted file mode 100644 index 895696e0..00000000 --- a/plugin-test/src/test/scala-3/ox/plugin/test/RequireIOTest.scala +++ /dev/null @@ -1,147 +0,0 @@ -package ox.plugin.test - -import dotty.tools.dotc.Compiler -import dotty.tools.dotc.core.Contexts.{Context, ContextBase} -import dotty.tools.dotc.reporting.{Diagnostic, TestingReporter} -import org.scalatest.funspec.AnyFunSpec -import org.scalatest.matchers.should.Matchers - -import scala.util.Properties - -class RequireIOTest extends AnyFunSpec with Matchers: - private def compilerContext(pluginOptions: List[String]): Context = - val base = new ContextBase {} - val compilerClasspath = Properties.propOrEmpty("scala-compiler-classpath") ++ s":${Properties.propOrEmpty("scala-compiler-plugin")}" - val context = base.initialCtx.fresh - context.setSetting(context.settings.classpath, compilerClasspath) - context.setSetting(context.settings.plugin, List(Properties.propOrEmpty("scala-compiler-plugin"))) - context.setSetting(context.settings.pluginOptions, pluginOptions) - context.setReporter(new TestingReporter) - base.initialize()(using context) - context - - private def compile(source: String, pluginOptions: List[String]): Seq[Diagnostic.Error] = - given Context = compilerContext(pluginOptions) - val c = new Compiler - val run = c.newRun - run.compileFromStrings(List(source)) - run.runContext.reporter.allErrors - - private def checkCompiles(source: String, pluginOptions: List[String] = Nil): Unit = - val _ = compile(source, pluginOptions) shouldBe empty - - private def checkDoesNotCompile(source: String, pluginOptions: List[String] = Nil): String = - val result = compile(source, pluginOptions) - result should not be empty - result.map(_.msg).mkString("\n") - - val expectedError = "The `java.io.InputStream.read` method throws an `java.io.IOException`" - - describe("should not compile") { - it("direct invocations") { - checkDoesNotCompile(""" - |import java.io.InputStream - | - |object Test: - | def test(): Unit = - | val is: InputStream = ??? - | is.read() - |""".stripMargin) should include(expectedError) - } - - it("using IO.unsafe outside of the invocation") { - checkDoesNotCompile(""" - |import java.io.InputStream - |import ox.IO - | - |object Test { - | def test(): Unit = { - | IO.unsafe { - | } - | val is: InputStream = ??? - | is.read() - | } - |}""".stripMargin) should include(expectedError) - } - - it("capability in another method") { - checkDoesNotCompile(""" - |import java.io.InputStream - |import ox.IO - | - |object Test { - | def test()(using IO): Unit = {} - | def test2(): Unit = { - | val is: InputStream = ??? - | is.read() - | } - | def test3()(using IO): Unit = {} - |}""".stripMargin) should include(expectedError) - } - - it("another exception, when configured to require IO") { - checkDoesNotCompile( - """ - |object Test { - | def test(): Unit = { - | val f: java.util.concurrent.Future[String] = ??? - | f.get() - | } - |}""".stripMargin, - List("requireIO:java.util.concurrent.ExecutionException") - ) should include("The `java.util.concurrent.Future.get` method throws an `java.util.concurrent.ExecutionException`") - } - } - - describe("should compile") { - it("direct invocations with the capability available") { - checkCompiles(""" - |import java.io.InputStream - |import ox.IO - | - |object Test2: - | def test()(using IO): Unit = - | val is: InputStream = ??? - | is.read() - |""".stripMargin) - } - - it("using IO.unsafe") { - checkCompiles(""" - |import java.io.InputStream - |import ox.IO - | - |object Test5 { - | def test(): Unit = { - | IO.unsafe { - | val is: InputStream = ??? - | is.read() - | } - | } - |}""".stripMargin) - } - - it("using external capability") { - checkCompiles(""" - |import java.io.InputStream - |import ox.IO - | - |object Test3 { - | given IO = new IO {} - | def test(): Unit = { - | val is: InputStream = ??? - | is.read() - | } - |}""".stripMargin) - } - - it("another exception, when not configured to require IO") { - checkCompiles(""" - |object Test { - | def test(): Unit = { - | val f: java.util.concurrent.Future[String] = ??? - | f.get() - | } - |}""".stripMargin) - } - } diff --git a/plugin/src/main/resources/plugin.properties b/plugin/src/main/resources/plugin.properties deleted file mode 100644 index a7063a86..00000000 --- a/plugin/src/main/resources/plugin.properties +++ /dev/null @@ -1 +0,0 @@ -pluginClass = ox.plugin.RequireIO diff --git a/plugin/src/main/scala/ox/plugin/RequireIO.scala b/plugin/src/main/scala/ox/plugin/RequireIO.scala deleted file mode 100644 index b0d870b4..00000000 --- a/plugin/src/main/scala/ox/plugin/RequireIO.scala +++ /dev/null @@ -1,78 +0,0 @@ -package ox.plugin - -import dotty.tools.dotc.ast.Trees.* -import dotty.tools.dotc.ast.tpd -import dotty.tools.dotc.core.Annotations.Annotation -import dotty.tools.dotc.core.Contexts.Context -import dotty.tools.dotc.core.{Flags, Symbols} -import dotty.tools.dotc.core.Symbols.* -import dotty.tools.dotc.plugins.{PluginPhase, StandardPlugin} -import dotty.tools.dotc.report -import dotty.tools.dotc.transform.{Pickler, PostTyper} -import dotty.tools.dotc.util.Property - -class RequireIO extends StandardPlugin: - val name = "requireIO" - override val description = "Require the IO capability when a Java method throws an IOException." - - def init(options: List[String]): List[PluginPhase] = new RequireIOPhase(options) :: Nil - -class RequireIOPhase(ioLikeExceptionClasses: List[String]) extends PluginPhase: - import tpd.* - - val phaseName = "requireIO" - - override val runsAfter: Set[String] = Set(PostTyper.name) - override val runsBefore: Set[String] = Set(Pickler.name) - - override def allowsImplicitSearch: Boolean = true - - // exceptions, which signal that a method performs I/O - private var ioLikeExceptions: List[Symbol] = _ - private var io: Symbol = _ - - override def run(using Context): Unit = { - ioLikeExceptions = ("java.io.IOException" :: "java.sql.SQLException" :: ioLikeExceptionClasses).map(requiredClass) - io = requiredClass("ox.IO") - - super.run - } - - private object ioAvailableProperty extends Property.Key[Boolean] - - override def prepareForDefDef(tree: tpd.DefDef)(using ctx: Context): Context = - val hasGivenIOParameter = tree.paramss.exists { params => - params.exists { param => - param.tpe <:< io.namedType && param.mods.is(Flags.Given) - } - } - - if hasGivenIOParameter then ctx.withProperty(ioAvailableProperty, Some(true)) else ctx - - private def throwsIOLikeException(a: Annotation)(using ctx: Context): Option[Symbol] = - if a.symbol == ctx.definitions.ThrowsAnnot then - a.argument(0).flatMap(thrownException => ioLikeExceptions.find(ioLikeException => thrownException.tpe <:< ioLikeException.namedType)) - else None - - override def transformApply(tree: Apply)(implicit ctx: Context): Tree = - tree.fun.symbol.annotations - .flatMap(throwsIOLikeException) - .headOption // we only want to check once per method - .foreach: ex => - val ctxAtPhase = ctx.withPhase(this) // needed so that inferImplicit works, which checks if the phase allowsImplicitSearch - val ioAvailableAsImplicit = ctxAtPhase.typer.inferImplicit(io.namedType, EmptyTree, tree.span)(using ctxAtPhase).isSuccess - - if !ioAvailableAsImplicit && !ctxAtPhase.property(ioAvailableProperty).getOrElse(false) then - report.error( - s"""The `${tree.fun.symbol.showFullName}` method throws an `${ex.showFullName}`, - |but the `ox.IO` capability is not available in the implicit scope. - | - |Try adding a `using IO` clause to the enclosing method. - | - |In tests, you might `import ox.IO.globalForTesting.given`. Alternatively, - |you can wrap your code with `IO.unsafe`, however this should only be used - |in special circumstances, as it bypasses Ox's tracking of I/O.""".stripMargin, - tree.sourcePos - ) - - tree