From 85c5607170b6efa3394fa33a7393cfaf2b6c5133 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 24 Jul 2021 09:43:52 +0100 Subject: [PATCH] common-fs2: File sink should rotate files with maximum size (close #440) --- config/config.pubsub.hocon.sample | 3 + .../snowplow/enrich/common/fs2/Run.scala | 8 +- .../enrich/common/fs2/config/io.scala | 2 +- .../snowplow/enrich/common/fs2/io/Sink.scala | 90 ++++++++++++++- .../enrich/common/fs2/io/SinkSpec.scala | 107 ++++++++++++++++++ 5 files changed, 200 insertions(+), 10 deletions(-) create mode 100644 modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/SinkSpec.scala diff --git a/config/config.pubsub.hocon.sample b/config/config.pubsub.hocon.sample index a7e40d41d..3625a654a 100644 --- a/config/config.pubsub.hocon.sample +++ b/config/config.pubsub.hocon.sample @@ -20,6 +20,7 @@ # Local FS supported for testing purposes # "type": "FileSystem" # "file": "/var/enriched" + # "maxBytes": 1000000 } # Pii events output @@ -33,6 +34,7 @@ # Local FS supported for testing purposes # "type": "FileSystem" # "file": "/var/pii" + # "maxBytes": 1000000 } # Bad rows output @@ -43,6 +45,7 @@ # Local FS supported for testing purposes # "type": "FileSystem" # "file": "/var/bad" + # "maxBytes": 1000000 } # Optional. Concurrency of the app diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index bf2470716..69a8c0fc2 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -72,8 +72,8 @@ object Run { sinkGood = initAttributedSink(blocker, file.good, file.monitoring, mkSinkGood) sinkPii = file.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii)) sinkBad = file.bad match { - case Output.FileSystem(path) => - Sink.fileSink[F](path, blocker) + case f: Output.FileSystem => + Sink.fileSink[F](f, blocker) case _ => mkSinkBad(blocker, file.bad, file.monitoring) } @@ -130,8 +130,8 @@ object Run { mkSinkGood: (Blocker, Output, Option[Monitoring]) => Resource[F, AttributedByteSink[F]] ): Resource[F, AttributedByteSink[F]] = output match { - case Output.FileSystem(path) => - Sink.fileSink[F](path, blocker).map(sink => row => sink(row.data)) + case f: Output.FileSystem => + Sink.fileSink[F](f, blocker).map(sink => row => sink(row.data)) case _ => mkSinkGood(blocker, output, monitoring) } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index 625677938..c052dc093 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -182,7 +182,7 @@ object io { throw new IllegalArgumentException(s"Cannot construct Output.PubSub from $topic") } } - case class FileSystem(file: Path) extends Output + case class FileSystem(file: Path, maxBytes: Option[Long]) extends Output case class Kinesis( streamName: String, region: Option[String], diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Sink.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Sink.scala index f0a3e3e21..c6a13f6dc 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Sink.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Sink.scala @@ -19,17 +19,25 @@ import java.nio.channels.FileChannel import cats.implicits._ import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync} -import cats.effect.concurrent.Semaphore +import cats.effect.concurrent.{Ref, Semaphore} +import fs2.Hotswap + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output.{FileSystem => FileSystemConfig} import com.snowplowanalytics.snowplow.enrich.common.fs2.ByteSink object Sink { - def fileSink[F[_]: Concurrent: ContextShift](path: Path, blocker: Blocker): Resource[F, ByteSink[F]] = + def fileSink[F[_]: Concurrent: ContextShift](config: FileSystemConfig, blocker: Blocker): Resource[F, ByteSink[F]] = + config.maxBytes match { + case Some(max) => rotatingFileSink(config.file, max, blocker) + case None => singleFileSink(config.file, blocker) + } + + /** Writes all events to a single file. Used when `maxBytes` is missing from configuration */ + def singleFileSink[F[_]: Concurrent: ContextShift](path: Path, blocker: Blocker): Resource[F, ByteSink[F]] = for { - channel <- Resource.fromAutoCloseableBlocking(blocker)( - Sync[F].delay(FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) - ) + channel <- makeChannel(blocker, path) sem <- Resource.eval(Semaphore(1L)) } yield { bytes => sem.withPermit { @@ -39,4 +47,76 @@ object Sink { }.void } } + + /** + * Opens a new file when the existing file exceeds `maxBytes` + * Each file has an integer suffix e.g. /path/to/good.0001 + */ + def rotatingFileSink[F[_]: Concurrent: ContextShift]( + path: Path, + maxBytes: Long, + blocker: Blocker + ): Resource[F, ByteSink[F]] = + for { + (hs, first) <- Hotswap(makeFile(blocker, 1, path)) + ref <- Resource.eval(Ref.of(first)) + sem <- Resource.eval(Semaphore(1L)) + } yield { bytes => + sem.withPermit { + for { + state <- ref.get + state <- maybeRotate(blocker, hs, path, state, maxBytes, bytes.size) + state <- writeLine(blocker, state, bytes) + _ <- ref.set(state) + } yield () + } + } + + case class FileState( + index: Int, + channel: FileChannel, + bytes: Int + ) + + private def makeChannel[F[_]: Sync: ContextShift](blocker: Blocker, path: Path): Resource[F, FileChannel] = + Resource.fromAutoCloseableBlocking(blocker) { + Sync[F].delay(FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) + } + + private def makeFile[F[_]: Sync: ContextShift]( + blocker: Blocker, + index: Int, + base: Path + ): Resource[F, FileState] = { + val path = base.resolveSibling(f"${base.getFileName}%s.$index%04d") + makeChannel(blocker, path).map { fc => + FileState(index, fc, 0) + } + } + + private def writeLine[F[_]: Sync: ContextShift]( + blocker: Blocker, + state: FileState, + bytes: Array[Byte] + ): F[FileState] = + blocker + .delay { + state.channel.write(ByteBuffer.wrap(bytes)) + state.channel.write(ByteBuffer.wrap(Array('\n'.toByte))) + } + .as(state.copy(bytes = state.bytes + bytes.length + 1)) + + private def maybeRotate[F[_]: Sync: ContextShift]( + blocker: Blocker, + hs: Hotswap[F, FileState], + base: Path, + state: FileState, + maxBytes: Long, + bytesToWrite: Int + ): F[FileState] = + if (state.bytes + bytesToWrite > maxBytes) + hs.swap(makeFile(blocker, state.index + 1, base)) + else + Sync[F].pure(state) + } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/SinkSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/SinkSpec.scala new file mode 100644 index 000000000..d85ba6c01 --- /dev/null +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/SinkSpec.scala @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.common.fs2.io + +import cats.implicits._ +import cats.effect.{Blocker, IO} +import cats.effect.testing.specs2.CatsIO +import java.nio.file.{Files, Path} +import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.io.{Source => ScalaSource} + +import org.specs2.mutable.Specification + +class SinkSpec extends Specification with CatsIO { + + "rotating file sink" should { + + "write to a single file if max bytes is not exceeded" in { + val dir = Files.createTempDirectory("enrich-sink-spec") + val blocker = Blocker.liftExecutionContext(ExecutionContext.global) + val maxBytes = 100L + + val write = Sink.rotatingFileSink[IO](dir.resolve("out"), maxBytes, blocker).use { sink => + for { + _ <- sink("AAAAA".getBytes) + _ <- sink("BBBBB".getBytes) + _ <- sink("CCCCC".getBytes) + } yield () + } + + for { + _ <- write + written <- filesInDir(dir) + withContent <- zipWithContent(written) + } yield { + withContent must have size 1 + val (path, content) = withContent.head + + path.getFileName.toString must be_==("out.0001") + content must_== (List("AAAAA", "BBBBB", "CCCCC")) + } + } + + "rotate files when max bytes is exceeded" in { + val dir = Files.createTempDirectory("enrich-sink-spec") + val blocker = Blocker.liftExecutionContext(ExecutionContext.global) + val maxBytes = 15L + + val write = Sink.rotatingFileSink[IO](dir.resolve("out"), maxBytes, blocker).use { sink => + for { + _ <- sink("AAAAA".getBytes) + _ <- sink("BBBBB".getBytes) + _ <- sink("CCCCC".getBytes) + _ <- sink("DDDDD".getBytes) + _ <- sink("EEEEE".getBytes) + } yield () + } + + for { + _ <- write + written <- filesInDir(dir) + withContent <- zipWithContent(written) + } yield { + withContent must have size 3 + written.map(_.getFileName.toString) must_== (List("out.0001", "out.0002", "out.0003")) + + withContent.map(_._2) must_== (List( + List("AAAAA", "BBBBB"), + List("CCCCC", "DDDDD"), + List("EEEEE") + )) + } + } + } + + def filesInDir(dir: Path): IO[List[Path]] = + IO.delay { + Files.list(dir) + }.bracket { stream => + IO.delay(stream.iterator.asScala.toList) + } { stream => + IO.delay(stream.close()) + }.map(_.sorted) + + def zipWithContent(files: List[Path]): IO[List[(Path, List[String])]] = + files.traverse { path => + IO.delay { + ScalaSource.fromFile(path.toFile) + }.bracket { source => + IO.delay(source.getLines().toList).map(lines => path -> lines) + } { source => + IO.delay(source.close()) + } + } + +}