Skip to content

Commit

Permalink
update Assets mechanism with semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Dec 10, 2021
1 parent 88f01a3 commit fed1b4d
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.enrich.common.fs2

import java.net.URI
import java.nio.file.{Path, Paths}
import java.nio.file.{Path, Paths, StandardCopyOption}

import scala.concurrent.duration._
import scala.util.control.NonFatal
Expand All @@ -22,13 +22,13 @@ import cats.{Applicative, Parallel}
import cats.implicits._

import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.concurrent.Ref
import cats.effect.concurrent.{Ref, Semaphore}

import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors}

import fs2.Stream
import fs2.hash.md5
import fs2.io.file.{copy, deleteIfExists, exists, readAll, tempFileResource, writeAll}
import fs2.io.file.{exists, move, readAll, tempFileResource, writeAll}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand All @@ -37,86 +37,66 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

/**
* Functions responsible for periodic assets (such as MaxMind/IAB DBs) updates
* The common logic is to periodically invoke a function that:
* 1. Downloads a file (in background) to a temp location
* 2. Compares file's checksum with existing one (stored in a mutable hashmap)
* 3. If checksums match - delete the temp file, return
* 4. If checksums don't match - send a signal to stop raw stream
* (via `SignallingRef` in [[Environment]])
* 5. Once raw stream is stopped - delete an old file and move
* temp file to the old's file location
* If any of those URIs been updated and stopped the raw stream, it will be
* immediately resumed once the above procedure traversed all files
*/
/** Code in charge of downloading and updating the assets used by enrichments (e.g. MaxMind/IAB DBs). */
object Assets {

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

/**
* State of the [[updateStream]], containing information about tracked URIs
* and `stop` signal from [[Environment]] as well as all clients necessary
* to download URIs
*
* @param files mutable hash map of URIs and their latest known state
* @param pauseEnrich stop signal coming from [[Environment]] and that can be used
* to stop the raw stream consumption
* @param clients HTTP, GCS, S3 clients if necessary
* Contains information about downloaded content and the clients to download URIs.
* @param hashes Hash map of URIs and their latest known state (hash).
* @param clients Clients to download URIs.
*/
final case class State[F[_]](
files: Ref[F, Map[URI, Hash]],
pauseEnrich: Ref[F, Boolean],
hashes: Ref[F, Map[URI, Hash]],
clients: Clients[F]
)

object State {

/** Test pair is used in tests to initialize HTTP client, will be ignored during initialization */
private val TestPair: Asset = URI.create("http://localhost:8080") -> "index"

/**
* Initialize an assets state. Try to find them on local FS
* or download if they're missing. Also initializes all necessary
* clients (S3, GCP etc)
* @param blocker thread pool for downloading and reading files
* @param stop global stop signal from [[Environment]]
* @param assets all assets that have to be tracked
* @param http the HTTP client that we share with other parts of the application
* Initializes the assets state.
* Tries to find them on local FS and download them if they're missing.
* @param blocker Thread pool for downloading and reading files.
* @param sem Permit shared with the enriching, used while initializing the state.
* @param clients Clients to download the URIS.
* @param enrichments Configurations of the enrichments. Contains the list of assets.
*/
def make[F[_]: ConcurrentEffect: Timer: ContextShift](
blocker: Blocker,
stop: Ref[F, Boolean],
assets: List[Asset],
clients: Clients[F]
): Resource[F, State[F]] =
sem: Semaphore[F],
clients: Clients[F],
assets: List[Asset]
): F[State[F]] =
for {
map <- Resource.eval(build[F](blocker, clients, assets.filterNot(asset => asset == TestPair)))
files <- Resource.eval(Ref.of[F, Map[URI, Hash]](map))
} yield State(files, stop, clients)
_ <- sem.acquire
map <- build[F](blocker, clients, assets)
hashes <- Ref.of[F, Map[URI, Hash]](map)
_ <- sem.release
} yield State(hashes, clients)

def build[F[_]: Concurrent: Timer: ContextShift](
blocker: Blocker,
clients: Clients[F],
assets: List[Asset]
): F[Map[URI, Hash]] =
Logger[F].info("Preparing enrichment assets") *>
Logger[F].info("Initializing (downloading) enrichments assets") *>
buildFromLocal(blocker, assets)
.flatMap { hashes =>
hashes.traverse {
case (uri, path, Some(hash)) =>
Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash)
case (uri, path, None) =>
downloadAndHash[F](clients, blocker, uri, Paths.get(path)).map(hash => uri -> hash)
downloadAndHash[F](blocker, clients, uri, Paths.get(path)).map(hash => uri -> hash)
}
}
.map(_.toMap)

def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] =
assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) }

/** Check if file already exists */
/** Checks if file already exists on filesystem. */
def local[F[_]: Sync: ContextShift](blocker: Blocker, path: String): F[Option[Hash]] = {
val fpath = Paths.get(path)
exists(blocker, fpath).ifM(
Expand All @@ -126,7 +106,7 @@ object Assets {
}
}

/** Valid MD5 hash */
/** MD5 hash. */
final case class Hash private (s: String) extends AnyVal

object Hash {
Expand All @@ -139,132 +119,123 @@ object Assets {
stream.through(md5).compile.to(Array).map(fromBytes)
}

/** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable) */
/** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable). */
type Asset = (URI, String)

/** Initialise the [[updateStream]] with all necessary resources if refresh period is specified */
def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel, A](env: Environment[F, A]): Stream[F, Unit] =
env.assetsUpdatePeriod match {
case Some(duration) =>
case class Downloaded(uri: URI, tpmPath: Path, finalPath: Path, hash: Hash)

/** Initializes the [[updateStream]] if refresh period is specified. */
def run[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer, A](
blocker: Blocker,
sem: Semaphore[F],
updatePeriod: Option[FiniteDuration],
assetsState: Assets.State[F],
enrichments: Ref[F, Environment.Enrichments[F]]
): Stream[F, Unit] =
updatePeriod match {
case Some(interval) =>
val init = for {
curDir <- getCurDir
_ <- Logger[F].info(s"Initializing assets refresh stream in $curDir, ticking every $duration")
assets <- env.enrichments.get.map(_.configs.flatMap(_.filesToCache))
} yield updateStream[F](env.blocker, env.assetsState, env.enrichments, curDir, duration, assets)
_ <- Logger[F].info(show"Assets will be checked every $interval")
assets <- enrichments.get.map(_.configs.flatMap(_.filesToCache))
} yield updateStream[F](blocker, sem, assetsState, enrichments, interval, assets)
Stream.eval(init).flatten
case None =>
Stream.empty.covary[F]
}

def getCurDir[F[_]: Sync]: F[Path] =
Sync[F].delay(Paths.get("").toAbsolutePath)

/**
* At the end of every update, the stop signal will be resumed to `false`
* Create an update stream that ticks periodically and can invoke an update action,
* which will download an URI and check if it has been update. If it has the
* raw stream will be stopped via `stop` signal from [[Environment]] and assets updated
* Creates an update stream that periodically checks if new versions of assets are available.
* If that's the case, updates them locally for the enrichments and updates the state.
*/
def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer](
blocker: Blocker,
sem: Semaphore[F],
state: State[F],
enrichments: Ref[F, Environment.Enrichments[F]],
curDir: Path,
duration: FiniteDuration,
interval: FiniteDuration,
assets: List[Asset]
): Stream[F, Unit] =
Stream.fixedDelay[F](duration).evalMap { _ =>
val log = Logger[F].debug(show"Checking remote assets: ${assets.map(_._1).mkString(", ")}")
val reinitialize: F[Unit] =
for {
// side-effecting get-set is inherently not thread-safe
// we need to be sure the state.stop is set to true
// before re-initializing enrichments
_ <- Logger[F].info("Resuming enrich stream")
old <- enrichments.get
_ <- Logger[F].info(show"Reinitializing enrichments: ${old.configs.map(_.schemaKey.name).mkString(", ")}")
fresh <- old.reinitialize(BlockerF.ofBlocker(blocker))
_ <- enrichments.set(fresh)
_ <- state.pauseEnrich.set(false)
} yield ()

val updated = downloadAndPause[F](blocker, state, curDir, assets)
log *> updated.ifM(reinitialize, Logger[F].debug("No assets have been updated since last check"))
Stream.fixedDelay[F](interval).evalMap { _ =>
for {
_ <- Logger[F].info(show"Checking if following assets have been updated: ${assets.map(_._1).mkString(", ")}")
curDir <- getCurDir
currentHashes <- state.hashes.get
downloaded = downloadAll(blocker, curDir, state.clients, assets)
_ <- downloaded.use { files =>
val newAssets = findUpdates(currentHashes, files)
if(newAssets.isEmpty)
Logger[F].info("All the assets are still the same, no update")
else
sem.withPermit {
update(blocker, state, enrichments, newAssets)
}
}
} yield ()
}

/**
* Download list of assets, return false if none has been downloaded
* It also can set `pauseEnrich` into `true` - a caller should make sure it's unpaused
*/
def downloadAndPause[F[_]: ConcurrentEffect: ContextShift: Timer](
/** Downloads all the assets, each into a temporary path.
* @return For each URI the temporary path and the hash of the file is returned,
* as well as the asset path on disk.
*/
def downloadAll[F[_]: ConcurrentEffect: ContextShift: Timer](
blocker: Blocker,
state: State[F],
dir: Path,
clients: Clients[F],
assets: List[Asset]
): F[Boolean] =
): Resource[F, List[Downloaded]] =
assets
.traverse {
case (uri, path) =>
update(blocker, state, dir, uri, Paths.get(path))
.traverse { a =>
tempFileResource[F](blocker, dir).evalMap { tmpPath =>
downloadAndHash(blocker, clients, a._1, tmpPath)
.map(hash => Downloaded(a._1, tmpPath, Paths.get(a._2), hash))
}
}
.map(_.contains(true))

/** Compares the hashes of downloaded assets with existing ones and keeps the different ones.
* @return List of assets that have been updated since last download.
*/
def findUpdates(
currentHashes: Map[URI, Hash],
downloaded: List[Downloaded]
): List[Downloaded] =
downloaded
.filterNot(a => currentHashes.get(a.uri).contains(a.hash))

/**
* Update a file in current directory if it has been updated on remote storage
* If a new file has been discovered - stops the enriching streams (signal in `state`)
* Do nothing if file hasn't been updated
*
* Note: this function has a potential to be thread-unsafe if download time
* exceeds tick period. We assume that no two threads will be downloading the same URI
*
* @param blocker a thread pool to execute download/copy operations
* @param state a map of URI to MD5 hash to keep track latest state of remote files
* @param curDir a local FS destination for temporary files
* @param uri a remote file (S3, GCS or HTTP), the URI is used as an identificator
* @param path a static file name that enrich clients will access
* file itself is placed in current dir (`dir`)
* @return true if file has been updated
*/
def update[F[_]: ConcurrentEffect: ContextShift: Timer](
* Performs all the updates after new version of at least an asset is available:
* 1. Replaces the existing file(s) on disk
* 2. Updates the state of the assets with new hash(es)
* 3. Updates the enrichments config
*/
def update[F[_]: ConcurrentEffect: ContextShift](
blocker: Blocker,
state: State[F],
curDir: Path,
uri: URI,
path: Path
): F[Boolean] =
tempFileResource[F](blocker, curDir).use { tmp =>
// Set stop signal and replace old file with temporary
def stopAndCopy(hash: Hash, delete: Boolean): F[Unit] =
for {
_ <- Logger[F].info(s"An asset at $uri has been updated since last check, pausing the enrich stream to reinitialize")
_ <- state.pauseEnrich.set(true)
_ <- if (delete) {
val deleted = Logger[F].info(s"Deleted outdated asset $path")
val notDeleted = Logger[F].warn(s"Couldn't delete $path, file didn't exist")
deleteIfExists(blocker, path).ifM(deleted, notDeleted)
} else Sync[F].unit
_ <- copy(blocker, tmp, path)
_ <- state.files.update(_.updated(uri, hash))
_ <- Logger[F].debug(s"Replaced $uri in Assets.State")
} yield ()
enrichments: Ref[F, Environment.Enrichments[F]],
newAssets: List[Downloaded]
): F[Unit] =
for {
_ <- newAssets.traverse_ { a =>
Logger[F].info(s"Remote ${a.uri} has changed, updating it locally") *>
move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING))
}

for {
hash <- downloadAndHash(state.clients, blocker, uri, tmp)
localFiles <- state.files.get
updated <- localFiles.get(uri) match {
case Some(known) if known == hash =>
Sync[F].pure(false)
case Some(_) =>
stopAndCopy(hash, true).as(true)
case None =>
stopAndCopy(hash, false).as(true)
}
} yield updated
}
_ <- Logger[F].info("Refreshing the state of assets")
hashes <- state.hashes.get
updatedHashes = hashes ++ newAssets.map(a => (a.uri, a.hash)).toMap
_ <- state.hashes.set(updatedHashes)

_ <- Logger[F].info("Reinitializing enrichments")
old <- enrichments.get
fresh <- old.reinitialize(BlockerF.ofBlocker(blocker))
_ <- enrichments.set(fresh)
} yield ()

def getCurDir[F[_]: Sync]: F[Path] =
Sync[F].delay(Paths.get("").toAbsolutePath)

def downloadAndHash[F[_]: Concurrent: ContextShift: Timer](
clients: Clients[F],
blocker: Blocker,
clients: Clients[F],
uri: URI,
destination: Path
): F[Hash] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ object Enrich {
/**
* Run a primary enrichment stream, reading from [[Environment]] source, enriching
* via [[enrichWith]] and sinking into the Good, Bad, and Pii sinks.
* Can be stopped via _stop signal_ from [[Environment]]
*
* The stream won't download any enrichment DBs, it is responsibility of [[Assets]]
* [[Assets.State.make]] downloads assets for the first time unconditionally during
Expand All @@ -80,13 +79,13 @@ object Enrich {
env.source.chunks
// .prefetch TODO
.evalTap(chunk => Logger[F].debug(s"Starting to process chunk of size ${chunk.size}"))
.pauseWhen(env.pauseEnrich)
.evalTap(chunk => env.metrics.rawCount(chunk.size))
.map(chunk => chunk.map(a => (a, env.getPayload(a))))
.evalMap(chunk =>
chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich)
env.semaphore.withPermit(
chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich)
)
)
// .evalMap(chunk => `env.pauser.withPermit(chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(64))!!)
.prefetch

val sinkAndCheckpoint: Pipe[F, List[(A, Result)], Unit] =
Expand Down
Loading

0 comments on commit fed1b4d

Please sign in to comment.