From 6dc77b5f8ecbde1b927325ad32ebeef5a1732df9 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 8 Dec 2021 19:23:43 +0100 Subject: [PATCH] update Assets mechanism with semaphore --- .../snowplow/enrich/common/fs2/Assets.scala | 260 ++++++++---------- .../snowplow/enrich/common/fs2/Enrich.scala | 7 +- .../enrich/common/fs2/Environment.scala | 35 +-- .../snowplow/enrich/common/fs2/Run.scala | 2 +- 4 files changed, 130 insertions(+), 174 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala index 88394438e..aa219159c 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2 import java.net.URI -import java.nio.file.{Path, Paths} +import java.nio.file.{Path, Paths, StandardCopyOption} import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -21,14 +21,14 @@ import scala.util.control.NonFatal import cats.{Applicative, Parallel} import cats.implicits._ -import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer} -import cats.effect.concurrent.Ref +import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Sync, Timer} +import cats.effect.concurrent.{Ref, Semaphore} import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors} import fs2.Stream import fs2.hash.md5 -import fs2.io.file.{copy, deleteIfExists, exists, readAll, tempFileResource, writeAll} +import fs2.io.file.{exists, move, readAll, tempFileResource, writeAll} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -37,64 +37,47 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients -/** - * Functions responsible for periodic assets (such as MaxMind/IAB DBs) updates - * The common logic is to periodically invoke a function that: - * 1. Downloads a file (in background) to a temp location - * 2. Compares file's checksum with existing one (stored in a mutable hashmap) - * 3. If checksums match - delete the temp file, return - * 4. If checksums don't match - send a signal to stop raw stream - * (via `SignallingRef` in [[Environment]]) - * 5. Once raw stream is stopped - delete an old file and move - * temp file to the old's file location - * If any of those URIs been updated and stopped the raw stream, it will be - * immediately resumed once the above procedure traversed all files - */ +/** Code in charge of downloading and updating the assets used by enrichments (e.g. MaxMind/IAB DBs). */ object Assets { private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] /** - * State of the [[updateStream]], containing information about tracked URIs - * and `stop` signal from [[Environment]] as well as all clients necessary - * to download URIs - * - * @param files mutable hash map of URIs and their latest known state - * @param pauseEnrich stop signal coming from [[Environment]] and that can be used - * to stop the raw stream consumption - * @param clients HTTP, GCS, S3 clients if necessary + * Contains information about downloaded content and the clients to download URIs. + * @param hashes Hash map of URIs and their latest known state (hash). + * @param clients Clients to download URIs. */ final case class State[F[_]]( - files: Ref[F, Map[URI, Hash]], - pauseEnrich: Ref[F, Boolean], + hashes: Ref[F, Map[URI, Hash]], clients: Clients[F] ) object State { - /** Test pair is used in tests to initialize HTTP client, will be ignored during initialization */ + /** Test pair used in tests to initialize HTTP client, will be ignored during initialization. */ private val TestPair: Asset = URI.create("http://localhost:8080") -> "index" /** - * Initialize an assets state. Try to find them on local FS - * or download if they're missing. Also initializes all necessary - * clients (S3, GCP etc) - * @param blocker thread pool for downloading and reading files - * @param stop global stop signal from [[Environment]] - * @param assets all assets that have to be tracked - * @param http the HTTP client that we share with other parts of the application + * Initializes an assets state. + * Tries to find them on local FS and download them if they're missing. + * @param blocker Thread pool for downloading and reading files. + * @param sem Permit shared with the enriching, used while initializing the state. + * @param clients Clients to download the URIS. + * @param enrichments Configurations of the enrichments. Contains the list of assets. */ def make[F[_]: ConcurrentEffect: Timer: ContextShift]( blocker: Blocker, - stop: Ref[F, Boolean], - assets: List[Asset], - clients: Clients[F] - ): Resource[F, State[F]] = + sem: Semaphore[F], + clients: Clients[F], + enrichments: Ref[F, Environment.Enrichments[F]] + ): F[State[F]] = for { - map <- Resource.eval(build[F](blocker, clients, assets.filterNot(asset => asset == TestPair))) - files <- Resource.eval(Ref.of[F, Map[URI, Hash]](map)) - } yield State(files, stop, clients) + assets <- getAssetsList(enrichments) + map <- build[F](blocker, clients, assets.filterNot(asset => asset == TestPair)) + hashes <- Ref.of[F, Map[URI, Hash]](map) + _ <- sem.release + } yield State(hashes, clients) def build[F[_]: Concurrent: Timer: ContextShift]( blocker: Blocker, @@ -108,7 +91,7 @@ object Assets { case (uri, path, Some(hash)) => Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash) case (uri, path, None) => - downloadAndHash[F](clients, blocker, uri, Paths.get(path)).map(hash => uri -> hash) + downloadAndHash[F](blocker, clients, uri, Paths.get(path)).map(hash => uri -> hash) } } .map(_.toMap) @@ -116,7 +99,7 @@ object Assets { def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] = assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) } - /** Check if file already exists */ + /** Checks if file already exists on filesystem. */ def local[F[_]: Sync: ContextShift](blocker: Blocker, path: String): F[Option[Hash]] = { val fpath = Paths.get(path) exists(blocker, fpath).ifM( @@ -126,7 +109,7 @@ object Assets { } } - /** Valid MD5 hash */ + /** MD5 hash. */ final case class Hash private (s: String) extends AnyVal object Hash { @@ -139,132 +122,123 @@ object Assets { stream.through(md5).compile.to(Array).map(fromBytes) } - /** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable) */ + /** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable). */ type Asset = (URI, String) - /** Initialise the [[updateStream]] with all necessary resources if refresh period is specified */ - def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel, A](env: Environment[F, A]): Stream[F, Unit] = - env.assetsUpdatePeriod match { - case Some(duration) => - val init = for { - curDir <- getCurDir - _ <- Logger[F].info(s"Initializing assets refresh stream in $curDir, ticking every $duration") - assets <- env.enrichments.get.map(_.configs.flatMap(_.filesToCache)) - } yield updateStream[F](env.blocker, env.assetsState, env.enrichments, curDir, duration, assets) + case class Downloaded(uri: URI, tpmPath: Path, finalPath: Path, hash: Hash) + + /** Initializes the [[updateStream]] if refresh period is specified. */ + def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel, A]( + blocker: Blocker, + sem: Semaphore[F], + updatePeriod: Option[FiniteDuration], + assetsState: Assets.State[F], + enrichments: Ref[F, Environment.Enrichments[F]] + ): Stream[F, Unit] = + updatePeriod match { + case Some(interval) => + val log = Logger[F].info(show"Assets will be updated every $interval") + val init = log.map(_ => updateStream[F](blocker, sem, assetsState, enrichments, interval)) Stream.eval(init).flatten case None => Stream.empty.covary[F] } - def getCurDir[F[_]: Sync]: F[Path] = - Sync[F].delay(Paths.get("").toAbsolutePath) - /** - * At the end of every update, the stop signal will be resumed to `false` - * Create an update stream that ticks periodically and can invoke an update action, - * which will download an URI and check if it has been update. If it has the - * raw stream will be stopped via `stop` signal from [[Environment]] and assets updated + * Creates an update stream that periodically checks if new versions of assets are available. + * If that's the case, updates them locally for the enrichments and updates the state. */ def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( blocker: Blocker, + sem: Semaphore[F], state: State[F], enrichments: Ref[F, Environment.Enrichments[F]], - curDir: Path, - duration: FiniteDuration, - assets: List[Asset] + interval: FiniteDuration ): Stream[F, Unit] = - Stream.fixedDelay[F](duration).evalMap { _ => - val log = Logger[F].debug(show"Checking remote assets: ${assets.map(_._1).mkString(", ")}") - val reinitialize: F[Unit] = - for { - // side-effecting get-set is inherently not thread-safe - // we need to be sure the state.stop is set to true - // before re-initializing enrichments - _ <- Logger[F].info("Resuming enrich stream") - old <- enrichments.get - _ <- Logger[F].info(show"Reinitializing enrichments: ${old.configs.map(_.schemaKey.name).mkString(", ")}") - fresh <- old.reinitialize(BlockerF.ofBlocker(blocker)) - _ <- enrichments.set(fresh) - _ <- state.pauseEnrich.set(false) - } yield () - - val updated = downloadAndPause[F](blocker, state, curDir, assets) - log *> updated.ifM(reinitialize, Logger[F].debug("No assets have been updated since last check")) + Stream.fixedDelay[F](interval).evalMap { _ => + for { + assets <- getAssetsList(enrichments) + _ <- Logger[F].info(show"Checking if following assets have been updated: ${assets.map(_._1).mkString(", ")}") + curDir <- getCurDir + downloaded <- downloadAll(blocker, curDir, state.clients, assets) + currentHashes <- state.hashes.get + newAssets = findUpdates(currentHashes, downloaded) + _ <- + if(newAssets.isEmpty) + Logger[F].info("All the assets are still the same, no update") + else + sem.withPermit { + update(blocker, state, enrichments, newAssets) + } + } yield () } - /** - * Download list of assets, return false if none has been downloaded - * It also can set `pauseEnrich` into `true` - a caller should make sure it's unpaused - */ - def downloadAndPause[F[_]: ConcurrentEffect: ContextShift: Timer]( + /** Downloads all the assets, each into a temporary path. + * @return For each URI the temporary path and the hash of the file is returned, + * as well as the asset path on disk. + */ + def downloadAll[F[_]: ConcurrentEffect: ContextShift: Timer]( blocker: Blocker, - state: State[F], - dir: Path, + curDir: Path, + clients: Clients[F], assets: List[Asset] - ): F[Boolean] = + ): F[List[Downloaded]] = assets - .traverse { - case (uri, path) => - update(blocker, state, dir, uri, Paths.get(path)) + .traverse { a => + tempFileResource[F](blocker, curDir).use { tmpPath => + downloadAndHash(blocker, clients, a._1, tmpPath) + .map(hash => Downloaded(a._1, tmpPath, Paths.get(a._2), hash)) + } } - .map(_.contains(true)) + + /** Compares the hashes of downloaded assets with existing ones and keeps the different ones. + * @return List of assets that have been updated since last download. + */ + def findUpdates( + currentHashes: Map[URI, Hash], + downloaded: List[Downloaded] + ): List[Downloaded] = + downloaded + .filterNot(a => currentHashes.get(a.uri).contains(a.hash)) /** - * Update a file in current directory if it has been updated on remote storage - * If a new file has been discovered - stops the enriching streams (signal in `state`) - * Do nothing if file hasn't been updated - * - * Note: this function has a potential to be thread-unsafe if download time - * exceeds tick period. We assume that no two threads will be downloading the same URI - * - * @param blocker a thread pool to execute download/copy operations - * @param state a map of URI to MD5 hash to keep track latest state of remote files - * @param curDir a local FS destination for temporary files - * @param uri a remote file (S3, GCS or HTTP), the URI is used as an identificator - * @param path a static file name that enrich clients will access - * file itself is placed in current dir (`dir`) - * @return true if file has been updated - */ - def update[F[_]: ConcurrentEffect: ContextShift: Timer]( + * Performs all the updates after new version of at least an asset is available: + * 1. Replaces the existing file(s) on disk + * 2. Updates the state of the assets with new hash(es) + * 3. Updates the enrichments config + */ + def update[F[_]: ConcurrentEffect: ContextShift]( blocker: Blocker, state: State[F], - curDir: Path, - uri: URI, - path: Path - ): F[Boolean] = - tempFileResource[F](blocker, curDir).use { tmp => - // Set stop signal and replace old file with temporary - def stopAndCopy(hash: Hash, delete: Boolean): F[Unit] = - for { - _ <- Logger[F].info(s"An asset at $uri has been updated since last check, pausing the enrich stream to reinitialize") - _ <- state.pauseEnrich.set(true) - _ <- if (delete) { - val deleted = Logger[F].info(s"Deleted outdated asset $path") - val notDeleted = Logger[F].warn(s"Couldn't delete $path, file didn't exist") - deleteIfExists(blocker, path).ifM(deleted, notDeleted) - } else Sync[F].unit - _ <- copy(blocker, tmp, path) - _ <- state.files.update(_.updated(uri, hash)) - _ <- Logger[F].debug(s"Replaced $uri in Assets.State") - } yield () + enrichments: Ref[F, Environment.Enrichments[F]], + newAssets: List[Downloaded] + ): F[Unit] = + for { + _ <- newAssets.traverse_ { a => + Logger[F].info(s"Remote ${a.uri} has changed, updating locally") *> + move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING)) + } - for { - hash <- downloadAndHash(state.clients, blocker, uri, tmp) - localFiles <- state.files.get - updated <- localFiles.get(uri) match { - case Some(known) if known == hash => - Sync[F].pure(false) - case Some(_) => - stopAndCopy(hash, true).as(true) - case None => - stopAndCopy(hash, false).as(true) - } - } yield updated - } + _ <- Logger[F].info("Refreshing the state of assets") + hashes <- state.hashes.get + updatedHashes = hashes ++ newAssets.map(a => (a.uri, a.hash)).toMap + _ <- state.hashes.set(updatedHashes) + + _ <- Logger[F].info("Reinitializing enrichments") + old <- enrichments.get + fresh <- old.reinitialize(BlockerF.ofBlocker(blocker)) + _ <- enrichments.set(fresh) + } yield () + + def getCurDir[F[_]: Sync]: F[Path] = + Sync[F].delay(Paths.get("").toAbsolutePath) + + def getAssetsList[F[_]: Sync](enrichments: Ref[F, Environment.Enrichments[F]]): F[List[Asset]] = + enrichments.get.map(_.configs.flatMap(_.filesToCache)) def downloadAndHash[F[_]: Concurrent: ContextShift: Timer]( - clients: Clients[F], blocker: Blocker, + clients: Clients[F], uri: URI, destination: Path ): F[Hash] = { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala index 6f7696370..5939d5152 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala @@ -62,7 +62,6 @@ object Enrich { /** * Run a primary enrichment stream, reading from [[Environment]] source, enriching * via [[enrichWith]] and sinking into the Good, Bad, and Pii sinks. - * Can be stopped via _stop signal_ from [[Environment]] * * The stream won't download any enrichment DBs, it is responsibility of [[Assets]] * [[Assets.State.make]] downloads assets for the first time unconditionally during @@ -80,13 +79,13 @@ object Enrich { env.source.chunks // .prefetch TODO .evalTap(chunk => Logger[F].debug(s"Starting to process chunk of size ${chunk.size}")) - .pauseWhen(env.pauseEnrich) .evalTap(chunk => env.metrics.rawCount(chunk.size)) .map(chunk => chunk.map(a => (a, env.getPayload(a)))) .evalMap(chunk => - chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich) + env.semaphore.withPermit( + chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich) + ) ) - // .evalMap(chunk => `env.pauser.withPermit(chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(64))!!) .prefetch val sinkAndCheckpoint: Pipe[F, List[(A, Result)], Unit] = diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index 14805af83..9df3e2f6e 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -18,19 +18,15 @@ import cats.Show import cats.data.EitherT import cats.implicits._ -import cats.effect.{Async, Blocker, Clock, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer} -import cats.effect.concurrent.Ref +import cats.effect.{Async, Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer} +import cats.effect.concurrent.{Ref, Semaphore} -import fs2.concurrent.SignallingRef import fs2.Stream import _root_.io.circe.Json import _root_.io.sentry.{Sentry, SentryClient} -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - import com.snowplowanalytics.iglu.client.{Client => IgluClient} import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup} @@ -57,8 +53,7 @@ import scala.concurrent.ExecutionContext * @param enrichments enrichment registry with all clients and parsed configuration files * it's wrapped in mutable variable because all resources need to be * reinitialized after DB assets are updated via [[Assets]] stream - * @param pauseEnrich a signalling reference that can pause a raw stream and enrichment, - * should be used only by [[Assets]] + * @param semaphore its permit is shared between enriching the events and updating the assets * @param assetsState a main entity from [[Assets]] stream, controlling when assets * have to be replaced with newer ones * @param blocker thread pool for blocking operations and enrichments themselves @@ -82,7 +77,7 @@ final case class Environment[F[_], A]( igluClient: IgluClient[F, Json], registryLookup: RegistryLookup[F], enrichments: Ref[F, Environment.Enrichments[F]], - pauseEnrich: SignallingRef[F, Boolean], + semaphore: Semaphore[F], assetsState: Assets.State[F], blocker: Blocker, source: Stream[F, A], @@ -102,9 +97,6 @@ final case class Environment[F[_], A]( object Environment { - private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = - Slf4jLogger.getLogger[F] - /** Registry with all allocated clients (MaxMind, IAB etc) and their original configs */ final case class Enrichments[F[_]](registry: EnrichmentRegistry[F], configs: List[EnrichmentConf]) { @@ -153,21 +145,20 @@ object Environment { clts = Clients.init[F](http, clients) igluClient <- IgluClient.parseDefault[F](parsedConfigs.igluJson).resource metrics <- Resource.eval(metricsReporter[F](blocker, file)) - download = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache) - pauseEnrich <- makePause[F] - assets <- Assets.State.make[F](blocker, pauseEnrich, download, clts) enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, BlockerF.ofBlocker(blocker)) + sem <- Resource.eval(Semaphore(1L)) + _ <- Resource.eval(sem.acquire) // will be released at the end of assets initialization + assetsState <- Resource.eval(Assets.State.make[F](blocker, sem, clts, enrichments)) sentry <- file.monitoring.flatMap(_.sentry).map(_.dsn) match { case Some(dsn) => Resource.eval[F, Option[SentryClient]](Sync[F].delay(Sentry.init(dsn.toString).some)) case None => Resource.pure[F, Option[SentryClient]](none[SentryClient]) } - _ <- Resource.eval(pauseEnrich.set(false) *> Logger[F].info("Enrich environment initialized")) } yield Environment[F, A]( igluClient, Http4sRegistryLookup(http), enrichments, - pauseEnrich, - assets, + sem, + assetsState, blocker, source, good, @@ -185,14 +176,6 @@ object Environment { ) } - /** - * Make sure `enrichPause` gets into paused state before destroying pipes - * Initialised into `true` because enrich stream should not start until - * [[Assets.State]] is constructed - it will download all assets - */ - def makePause[F[_]: Concurrent]: Resource[F, SignallingRef[F, Boolean]] = - Resource.make(SignallingRef(true))(_.set(true)) - private def metricsReporter[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker, config: ConfigFile): F[Metrics[F]] = config.monitoring.flatMap(_.metrics).map(Metrics.build[F](blocker, _)).getOrElse(Metrics.noop[F].pure[F]) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index 2976a6fda..bf2470716 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -142,7 +142,7 @@ object Run { environment.use { env => val log = Logger[F].info("Running enrichment stream") val enrich = Enrich.run[F, A](env) - val updates = Assets.run[F, A](env) + val updates = Assets.run[F, A](env.blocker, env.semaphore, env.assetsUpdatePeriod, env.assetsState, env.enrichments) val reporting = env.metrics.report val flow = enrich.merge(updates).merge(reporting) log >> flow.compile.drain.as(ExitCode.Success).recoverWith {