Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO capability & compiler plugin #140

Merged
merged 14 commits into from
May 22, 2024
61 changes: 58 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@ compileDocumentation := {
(documentation / mdoc).toTask(" --out target/ox-doc").value
}

val useRequireIOPlugin =
// Based on:
// https://stackoverflow.com/questions/54660122/how-to-include-a-scala-compiler-plugin-from-a-local-path
Compile / scalacOptions ++= {
val jar = (plugin / Compile / Keys.`package`).value
System.setProperty("sbt.paths.plugin.jar", jar.getAbsolutePath)

val addPlugin = "-Xplugin:" + jar.getAbsolutePath
val dummy = "-Jdummy=" + jar.lastModified
Seq(addPlugin, dummy)
}

lazy val rootProject = (project in file("."))
.settings(commonSettings)
.settings(publishArtifact := false, name := "ox")
.aggregate(core, examples, kafka)
.aggregate(core, plugin, pluginTest, examples, kafka)

lazy val core: Project = (project in file("core"))
.settings(commonSettings)
Expand All @@ -36,7 +48,49 @@ lazy val core: Project = (project in file("core"))
libraryDependencies ++= Seq(
"com.softwaremill.jox" % "core" % "0.2.0",
scalaTest
)
),
// Check IO usage in core
useRequireIOPlugin
)

lazy val plugin: Project = (project in file("plugin"))
.settings(commonSettings)
.settings(
name := "plugin",
libraryDependencies ++= Seq("org.scala-lang" %% "scala3-compiler" % scalaVersion.value, scalaTest)
)

lazy val pluginTest: Project = (project in file("plugin-test"))
.settings(commonSettings)
.settings(
name := "plugin-test",
libraryDependencies ++= Seq(
"org.scala-lang" %% "scala3-compiler" % scalaVersion.value,
scalaTest
),
publishArtifact := false,
// Playground testing
useRequireIOPlugin,
// Unit testing, based on:
// https://github.com/xebia-functional/munit-compiler-toolkit/
Test / javaOptions += {
val dependencyJars = (Compile / dependencyClasspath).value.files
val thisJar = (Compile / Keys.`package`).value
val `scala-compiler-classpath` =
(dependencyJars :+ thisJar)
.map(_.toPath.toAbsolutePath.toString())
.mkString(":")
s"-Dscala-compiler-classpath=${`scala-compiler-classpath`}"
},
Test / javaOptions += Def.taskDyn {
Def.task {
val _ = (plugin / Compile / Keys.`package`).value
val `scala-compiler-options` =
s"${(plugin / Compile / packageBin).value}"
s"""-Dscala-compiler-plugin=${`scala-compiler-options`}"""
}
}.value,
Test / fork := true
)

lazy val examples: Project = (project in file("examples"))
Expand All @@ -63,7 +117,8 @@ lazy val kafka: Project = (project in file("kafka"))
"org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % Test,
"org.apache.pekko" %% "pekko-stream" % "1.0.2" % Test,
scalaTest
)
),
useRequireIOPlugin
)
.dependsOn(core)

Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/ox/IO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ox

/** Represents a capability to perform I/O operations. The capability should be part of the signature of any method, which either performs
* I/O operations, or calls other methods which perform I/O operations. For example:
*
* {{{
* def readFromFile(path: String)(using IO): String = ...
* def writeToFile(path: String, content: String)(using IO): Unit = ...
* def transform(path: String)(f: String => String)(using IO): Unit =
* writeToFile(path, f(readFromFile(path)))
* }}}
*
* The capability can be introduced using [[IO.unsafe]] or by `import ox.IO.globalForTesting.given`.
*
* Take care not to capture the capability e.g. using constructors (unless you are sure such usage is safe), as this might circumvent the
* tracking of I/O operations. Similarly, the capability might be captured by lambdas, which might later be used when the IO capability is
* not in scope. In future Scala and Ox releases, these problems should be detected at compile-time using the upcoming capture checker.
*/
class IO private[ox] ()

object IO:
private val IOInstance = new IO

/** Grants the [[IO]] capability when executing the given block of code. Ideally should **only** be used:
* - at the edges of your application (e.g. in the `main` method)
* - when integrating with third-party libraries
*
* In tests, use [[globalForTesting]].
*/
inline def unsafe[T](f: IO ?=> T): T = f(using IOInstance)

/** When imported using `import ox.IO.globalForTesting.given`, grants the [[IO]] capability globally within the scope of the import.
* Designed to be used in tests.
*/
object globalForTesting:
given IO = IOInstance
4 changes: 2 additions & 2 deletions core/src/main/scala/ox/channels/SourceCompanionIOOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait SourceCompanionIOOps:
* @return
* a `Source` of chunks of bytes.
*/
def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox, StageCapacity): Source[Chunk[Byte]] =
def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] =
val chunks = StageCapacity.newChannel[Chunk[Byte]]
fork {
try
Expand Down Expand Up @@ -63,7 +63,7 @@ trait SourceCompanionIOOps:
* @throws SecurityException
* If SecurityManager error occurs when opening the file.
*/
def fromFile(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity): Source[Chunk[Byte]] =
def fromFile(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] =
if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory")
val chunks = StageCapacity.newChannel[Chunk[Byte]]
val jFileChannel =
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/ox/channels/SourceIOOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait SourceIOOps[+T]:
* @throws IOException
* if an error occurs when writing or closing of the `OutputStream`.
*/
def toOutputStream(outputStream: OutputStream)(using T <:< Chunk[Byte]): Unit =
def toOutputStream(outputStream: OutputStream)(using T <:< Chunk[Byte], IO): Unit =
repeatWhile {
outer.receiveOrClosed() match
case ChannelClosed.Done =>
Expand All @@ -64,7 +64,7 @@ trait SourceIOOps[+T]:
* @throws IOException
* if an error occurs when opening the file or during the write process.
*/
def toFile(path: Path)(using T <:< Chunk[Byte]): Unit =
def toFile(path: Path)(using T <:< Chunk[Byte], IO): Unit =
if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory")
val jFileChannel =
try {
Expand All @@ -91,7 +91,7 @@ trait SourceIOOps[+T]:
throw e
}

private inline def close(closeable: Closeable, cause: Option[Throwable] = None): Unit =
private inline def close(closeable: Closeable, cause: Option[Throwable] = None)(using IO): Unit =
try
closeable.close()
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import java.nio.file.Paths
import java.io.IOException

class SourceCompanionIOOpsTest extends AnyWordSpec with Matchers:
import ox.IO.globalForTesting.given

def emptyInputStream: TestInputStream = new TestInputStream("")
def inputStream(text: String, failing: Boolean = false): TestInputStream = new TestInputStream(text, failing)
Expand Down Expand Up @@ -81,7 +82,7 @@ class SourceCompanionIOOpsTest extends AnyWordSpec with Matchers:
private def toStrings(source: Source[Chunk[Byte]]): List[String] =
source.toList.map(_.asStringUtf8)

class TestInputStream(text: String, throwOnRead: Boolean = false) extends ByteArrayInputStream(text.getBytes):
class TestInputStream(text: String, throwOnRead: Boolean = false)(using IO) extends ByteArrayInputStream(text.getBytes):
val closed: AtomicBoolean = new AtomicBoolean(false)

override def close(): Unit =
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/ox/channels/SourceIOOpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean

class SourceIOOpsTest extends AnyWordSpec with Matchers:
import ox.IO.globalForTesting.given

def inputStreamToString(is: InputStream)(using Ox): String = {
val source = useInScope(scala.io.Source.fromInputStream(is))(_.close())
Expand Down Expand Up @@ -170,7 +171,7 @@ class SourceIOOpsTest extends AnyWordSpec with Matchers:
private def fileContent(path: Path)(using Ox): List[String] =
Source.fromFile(path).toList.map(_.asStringUtf8)

class TestOutputStream(throwOnWrite: Boolean = false) extends ByteArrayOutputStream:
class TestOutputStream(throwOnWrite: Boolean = false)(using IO) extends ByteArrayOutputStream:
val closed: AtomicBoolean = new AtomicBoolean(false)

override def close(): Unit =
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/ox/channels/SourceTextOpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.scalatest.wordspec.AnyWordSpec
import java.nio.charset.Charset

class SourceTextOpsTest extends AnyWordSpec with Matchers {
import ox.IO.globalForTesting.given

"source.linesUtf8" should {

Expand Down
62 changes: 33 additions & 29 deletions doc/channels/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Ox allows creating a `Source` which reads from a file or `InpuStream`, as well as directing an existing source into a file or an `OutputStream`. These methods work only with a `Source[Chunk[Byte]]`. Ox takes care of closing files/streams after processing and on errors.

All I/O operations require the [IO capability](../io.md).

## InputStream and OutputStream

### Source.fromInputStream
Expand All @@ -10,17 +12,18 @@ An `InputStream` can be converted to a `Source[Chunk[Byte]]`:

```scala mdoc:compile-only
import ox.channels.Source
import ox.supervised
import ox.{IO, supervised}
import java.io.ByteArrayInputStream
import java.io.InputStream

val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes)
supervised {
Source
.fromInputStream(inputStream) // Source[Chunk[Byte]]
.decodeStringUtf8
.map(_.toUpperCase)
.foreach(println) // "SOME INPUT"
IO.unsafe:
Source
.fromInputStream(inputStream) // Source[Chunk[Byte]]
.decodeStringUtf8
.map(_.toUpperCase)
.foreach(println) // "SOME INPUT"
}
```

Expand All @@ -29,17 +32,18 @@ You can define a custom chunk size instead of using the default:

```scala mdoc:compile-only
import ox.channels.Source
import ox.supervised
import ox.{IO, supervised}
import java.io.ByteArrayInputStream
import java.io.InputStream

val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes)
supervised {
Source
.fromInputStream(inputStream, chunkSize = 4) // Source[Chunk[Byte]]
.decodeStringUtf8
.map(_.toUpperCase)
.foreach(println) // "SOME", " INPUT"
IO.unsafe:
Source
.fromInputStream(inputStream, chunkSize = 4) // Source[Chunk[Byte]]
.decodeStringUtf8
.map(_.toUpperCase)
.foreach(println) // "SOME", " INPUT"
}
```

Expand All @@ -49,17 +53,16 @@ A `Source[Chunk[Byte]]` can be directed to write to an `OutputStream`:

```scala mdoc:compile-only
import ox.channels.Source
import ox.Chunk
import ox.supervised
import ox.{Chunk, IO, supervised}
import java.io.ByteArrayOutputStream
import java.io.OutputStream

val outputStream = new ByteArrayOutputStream()
supervised {
val source = Source.fromIterable(List("text1,", "text2"))
source
.encodeUtf8
.toOutputStream(outputStream)
IO.unsafe:
source
.encodeUtf8
.toOutputStream(outputStream)
}
outputStream.toString // "TEXT1,TEXT2"
```
Expand All @@ -72,15 +75,16 @@ You can obtain a `Source` of byte chunks read from a file for a given path:

```scala mdoc:compile-only
import ox.channels.Source
import ox.supervised
import ox.{IO, supervised}
import java.nio.file.Paths

supervised {
Source
.fromFile(Paths.get("/path/to/my/file.txt"))
.linesUtf8
.map(_.toUpperCase)
.toList // List("FILE_LINE1", "FILE_LINE2")
IO.unsafe:
Source
.fromFile(Paths.get("/path/to/my/file.txt"))
.linesUtf8
.map(_.toUpperCase)
.toList // List("FILE_LINE1", "FILE_LINE2")
}
```

Expand All @@ -93,14 +97,14 @@ A `Source[Chunk[Byte]]` can be written to a file under a given path:
```scala mdoc:compile-only

import ox.channels.Source
import ox.Chunk
import ox.supervised
import ox.{Chunk, IO, supervised}
import java.nio.file.Paths

supervised {
val source = Source.fromIterable(List("text1,", "text2"))
source
.encodeUtf8
.toFile(Paths.get("/path/to/my/target/file.txt"))
IO.unsafe:
source
.encodeUtf8
.toFile(Paths.get("/path/to/my/target/file.txt"))
}
```
3 changes: 2 additions & 1 deletion doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ In addition to this documentation, ScalaDocs can be browsed at [https://javadoc.

.. toctree::
:maxdepth: 2
:caption: Resources, resiliency & utilities
:caption: Resiliency, I/O & utilities

io
retries
resources
control-flow
Expand Down
Loading
Loading