Skip to content

Commit

Permalink
Add tap and tapAsView operators and document how to use them for …
Browse files Browse the repository at this point in the history
…logging (#170)
  • Loading branch information
micossow authored Jul 2, 2024
1 parent 6771b49 commit 8d52c68
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 1 deletion.
32 changes: 31 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ trait SourceOps[+T] { outer: Source[T] =>
override val delegate: JSource[Any] = outer.delegate.asInstanceOf[JSource[T]].collectAsView(t => f(t))
}

/** Lazily-evaluated tap: creates a view of this source, where the results of [[receive]] will be applied to the given function `f` on the
* consumer's thread. Useful for side-effects without result values, like logging and debugging. For an eager, asynchronous version, see
* [[tap]].
*
* The same logic applies to receive clauses created using this source, which can be used in [[select]].
*
* @param f
* The consumer function.
* @return
* A source which is a view of this source, with the consumer function applied.
*/
def tapAsView(f: T => Unit): Source[T] = mapAsView(t => { f(t); t })

/** Lazily-evaluated filter: Creates a view of this source, where the results of [[receive]] will be filtered on the consumer's thread
* using the given predicate `p`. For an eager, asynchronous version, see [[filter]].
*
Expand Down Expand Up @@ -70,7 +83,7 @@ trait SourceOps[+T] { outer: Source[T] =>
/** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel.
*
* Errors from this channel are propagated to the returned channel. Any exceptions that occur when invoking `f` are propagated as errors
* to the returned channel as wel.
* to the returned channel as well.
*
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
Expand Down Expand Up @@ -101,6 +114,23 @@ trait SourceOps[+T] { outer: Source[T] =>
}
c2

/** Applies the given consumer function `f` to each element received from this source.
*
* Errors from this channel are propagated to the returned channel. Any exceptions that occur when invoking `f` are propagated as errors
* to the returned channel as well.
*
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
*
* Useful for side-effects without result values, like logging and debugging. For a lazily-evaluated version, see [[tapAsView]].
*
* @param f
* The consumer function.
* @return
* A source, which the elements from the input source are passed to.
*/
def tap(f: T => Unit)(using Ox, StageCapacity): Source[T] = map(t => { f(t); t })

/** Intersperses this source with provided element and forwards it to the returned channel.
*
* @param inject
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsAsViewTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Try}

Expand Down Expand Up @@ -111,6 +112,27 @@ class SourceOpsAsViewTest extends AnyFlatSpec with Matchers with Eventually {
}
}

it should "tap over a source as a view" in {
val c: Channel[Int] = Channel.rendezvous
val sum = new AtomicInteger()

supervised {
fork {
c.send(1)
c.send(2)
c.send(3)
c.done()
}

val s2 = c.tapAsView(v => sum.addAndGet(v).discard)
s2.receive() shouldBe 1
s2.receive() shouldBe 2
s2.receive() shouldBe 3
s2.receiveOrClosed() shouldBe ChannelClosed.Done
sum.get() shouldBe 6
}
}

it should "propagate exceptions to the calling select" in {
val c: Channel[Int] = Channel.rendezvous

Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.DurationInt

class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually {

it should "timeout" in {
supervised {
val c = Source.timeout(100.millis)
Expand Down Expand Up @@ -65,4 +67,12 @@ class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually {
s.toList shouldBe List("a", "b", "c", "d", "e", "f", "g", "h", "i")
}
}

it should "tap over a source" in {
supervised {
val sum = new AtomicInteger()
Source.fromValues(1, 2, 3).tap(v => sum.addAndGet(v).discard).toList shouldBe List(1, 2, 3)
sum.get() shouldBe 6
}
}
}
20 changes: 20 additions & 0 deletions doc/channels/transforming-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,23 @@ When dealing with Sources with chunks of bytes or Strings, you can leverage foll
* `decodeStringUtf8` to decode a `Source[Chunk[Byte]]` into a `Source[String]`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.

Such operations may be useful when dealing with I/O like files, `InputStream`, etc.. See [examples here](io.md).

## Logging

Ox does not have any integrations with logging libraries, but it provides a simple way to log elements flowing through channels
using the `.tap` (eagerly evaluated) or `.tapAsView` (lazily evaluated) methods.

```scala mdoc:compile-only
import ox.supervised
import ox.channels.Source

supervised {
Source.fromValues(1, 2, 3)
.tap(n => println(s"Received: $n")) // prints as soon as the element is sent from the source
.toList

Source.fromValues(1, 2, 3)
.tapAsView(n => println(s"Received: $n")) // prints when the element is consumed by `toList`
.toList
}
```

0 comments on commit 8d52c68

Please sign in to comment.