-
Notifications
You must be signed in to change notification settings - Fork 2
/
QueuedLogger.scala
42 lines (35 loc) · 1 KB
/
QueuedLogger.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package logger
package fs2interop
import cats.effect.Async
import cats.effect.Resource
import cats.effect.std.QueueSink
import scala.concurrent.duration._
import logger.Log
import java.nio.charset.StandardCharsets
object QueuedLogger {
def toStdout[F[_]: Async](
capacity: Int = 1024,
drainTimeout: FiniteDuration = 1.second
): Resource[F, LoggerKernel[F]] =
make(
capacity,
drainTimeout,
_.map(SingleLineRenderer.render)
.through(fs2.text.utf8Encode)
.through(fs2.io.stdout)
)
def make[F[_]: Async](
capacity: Int,
drainTimeout: FiniteDuration,
sink: fs2.Pipe[F, Log, Unit]
): Resource[F, LoggerKernel[F]] = {
StreamFunnel[F, Log](capacity, drainTimeout, sink).map(q => new Impl(q))
}
private[fs2interop] final class Impl[F[_]](
recordsSink: QueueSink[F, Log]
) extends LoggerKernel[F] {
def log(record: Log.Builder => Log.Builder): F[Unit] = {
recordsSink.offer(record(Log.mutableBuilder()).build())
}
}
}