Skip to content

Commit

Permalink
Stream FS2: File sink should rotate files with maximum size (close #440)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and benjben committed Oct 13, 2021
1 parent 4f76c85 commit 9fb30c1
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 10 deletions.
3 changes: 3 additions & 0 deletions config/config.pubsub.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/enriched"
# "maxBytes": 1000000
}

# Pii events output
Expand All @@ -33,6 +34,7 @@
# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/pii"
# "maxBytes": 1000000
}

# Bad rows output
Expand All @@ -43,6 +45,7 @@
# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/bad"
# "maxBytes": 1000000
}

# Concurrency of the app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ object Run {
goodSink = initAttributedSink(blocker, file.good, file.monitoring, mkGoodSink)
piiSink = file.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkPiiSink))
badSink = file.bad match {
case Output.FileSystem(path) =>
Sink.fileSink[F](path, blocker)
case f: Output.FileSystem =>
Sink.fileSink[F](f, blocker)
case _ =>
mkBadSink(blocker, file.bad, file.monitoring)
}
Expand Down Expand Up @@ -133,8 +133,8 @@ object Run {
mkGoodSink: (Blocker, Output, Option[Monitoring]) => Resource[F, AttributedByteSink[F]]
): Resource[F, AttributedByteSink[F]] =
output match {
case Output.FileSystem(path) =>
Sink.fileSink[F](path, blocker).map(sink => row => sink(row.data))
case f: Output.FileSystem =>
Sink.fileSink[F](f, blocker).map(sink => row => sink(row.data))
case _ =>
mkGoodSink(blocker, output, monitoring)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ object io {
throw new IllegalArgumentException(s"Cannot construct Output.PubSub from $topic")
}
}
case class FileSystem(file: Path) extends Output
case class FileSystem(file: Path, maxBytes: Option[Long]) extends Output
case class Kinesis(
streamName: String,
region: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@ import java.nio.channels.FileChannel
import cats.implicits._

import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync}
import cats.effect.concurrent.Semaphore
import cats.effect.concurrent.{Ref, Semaphore}
import fs2.Hotswap

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output.{FileSystem => FileSystemConfig}

import com.snowplowanalytics.snowplow.enrich.common.fs2.ByteSink

object Sink {

def fileSink[F[_]: Concurrent: ContextShift](path: Path, blocker: Blocker): Resource[F, ByteSink[F]] =
def fileSink[F[_]: Concurrent: ContextShift](config: FileSystemConfig, blocker: Blocker): Resource[F, ByteSink[F]] =
config.maxBytes match {
case Some(max) => rotatingFileSink(config.file, max, blocker)
case None => singleFileSink(config.file, blocker)
}

/** Writes all events to a single file. Used when `maxBytes` is missing from configuration */
def singleFileSink[F[_]: Concurrent: ContextShift](path: Path, blocker: Blocker): Resource[F, ByteSink[F]] =
for {
channel <- Resource.fromAutoCloseableBlocking(blocker)(
Sync[F].delay(FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
)
channel <- makeChannel(blocker, path)
sem <- Resource.eval(Semaphore(1L))
} yield { bytes =>
sem.withPermit {
Expand All @@ -40,4 +48,75 @@ object Sink {
}
}

/**
* Opens a new file when the existing file exceeds `maxBytes`
* Each file has an integer suffix e.g. /path/to/good.0001
*/
def rotatingFileSink[F[_]: Concurrent: ContextShift](
path: Path,
maxBytes: Long,
blocker: Blocker
): Resource[F, ByteSink[F]] =
for {
(hs, first) <- Hotswap(makeFile(blocker, 1, path))
ref <- Resource.eval(Ref.of(first))
sem <- Resource.eval(Semaphore(1L))
} yield { bytes =>
sem.withPermit {
for {
state <- ref.get
state <- maybeRotate(blocker, hs, path, state, maxBytes, bytes.size)
state <- writeLine(blocker, state, bytes)
_ <- ref.set(state)
} yield ()
}
}

case class FileState(
index: Int,
channel: FileChannel,
bytes: Int
)

private def makeChannel[F[_]: Sync: ContextShift](blocker: Blocker, path: Path): Resource[F, FileChannel] =
Resource.fromAutoCloseableBlocking(blocker) {
Sync[F].delay(FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
}

private def makeFile[F[_]: Sync: ContextShift](
blocker: Blocker,
index: Int,
base: Path
): Resource[F, FileState] = {
val path = base.resolveSibling(f"${base.getFileName}%s.$index%04d")
makeChannel(blocker, path).map { fc =>
FileState(index, fc, 0)
}
}

private def writeLine[F[_]: Sync: ContextShift](
blocker: Blocker,
state: FileState,
bytes: Array[Byte]
): F[FileState] =
blocker
.delay {
state.channel.write(ByteBuffer.wrap(bytes))
state.channel.write(ByteBuffer.wrap(Array('\n'.toByte)))
}
.as(state.copy(bytes = state.bytes + bytes.length + 1))

private def maybeRotate[F[_]: Sync: ContextShift](
blocker: Blocker,
hs: Hotswap[F, FileState],
base: Path,
state: FileState,
maxBytes: Long,
bytesToWrite: Int
): F[FileState] =
if (state.bytes + bytesToWrite > maxBytes)
hs.swap(makeFile(blocker, state.index + 1, base))
else
Sync[F].pure(state)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2.io

import cats.implicits._
import cats.effect.{Blocker, IO}
import cats.effect.testing.specs2.CatsIO
import java.nio.file.{Files, Path}
import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters._
import scala.io.{Source => ScalaSource}

import org.specs2.mutable.Specification

class SinkSpec extends Specification with CatsIO {

"rotating file sink" should {

"write to a single file if max bytes is not exceeded" in {
val dir = Files.createTempDirectory("enrich-sink-spec")
val blocker = Blocker.liftExecutionContext(ExecutionContext.global)
val maxBytes = 100L

val write = Sink.rotatingFileSink[IO](dir.resolve("out"), maxBytes, blocker).use { sink =>
for {
_ <- sink("AAAAA".getBytes)
_ <- sink("BBBBB".getBytes)
_ <- sink("CCCCC".getBytes)
} yield ()
}

for {
_ <- write
written <- filesInDir(dir)
withContent <- zipWithContent(written)
} yield {
withContent must have size(1)
val (path, content) = withContent.head

path.getFileName.toString must be_==("out.0001")
content must_== (List("AAAAA", "BBBBB", "CCCCC"))
}
}

"rotate files when max bytes is exceeded" in {
val dir = Files.createTempDirectory("enrich-sink-spec")
val blocker = Blocker.liftExecutionContext(ExecutionContext.global)
val maxBytes = 15L

val write = Sink.rotatingFileSink[IO](dir.resolve("out"), maxBytes, blocker).use { sink =>
for {
_ <- sink("AAAAA".getBytes)
_ <- sink("BBBBB".getBytes)
_ <- sink("CCCCC".getBytes)
_ <- sink("DDDDD".getBytes)
_ <- sink("EEEEE".getBytes)
} yield ()
}

for {
_ <- write
written <- filesInDir(dir)
withContent <- zipWithContent(written)
} yield {
withContent must have size(3)
written.map(_.getFileName.toString) must_== (List("out.0001", "out.0002", "out.0003"))

withContent.map(_._2) must_== (List(
List("AAAAA", "BBBBB"),
List("CCCCC", "DDDDD"),
List("EEEEE")
))
}
}
}

def filesInDir(dir: Path): IO[List[Path]] =
IO.delay {
Files.list(dir)
}.bracket { stream =>
IO.delay(stream.iterator.asScala.toList)
} { stream =>
IO.delay(stream.close())
}.map(_.sorted)

def zipWithContent(files: List[Path]): IO[List[(Path, List[String])]] =
files.traverse { path =>
IO.delay {
ScalaSource.fromFile(path.toFile)
}.bracket { source =>
IO.delay(source.getLines().toList).map(lines => path -> lines)
} { source =>
IO.delay(source.close())
}
}

}

0 comments on commit 9fb30c1

Please sign in to comment.