Skip to content

Commit

Permalink
update Assets mechanism with semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Dec 8, 2021
1 parent 88f01a3 commit 6dc77b5
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
package com.snowplowanalytics.snowplow.enrich.common.fs2

import java.net.URI
import java.nio.file.{Path, Paths}
import java.nio.file.{Path, Paths, StandardCopyOption}

import scala.concurrent.duration._
import scala.util.control.NonFatal

import cats.{Applicative, Parallel}
import cats.implicits._

import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.concurrent.Ref
import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Sync, Timer}
import cats.effect.concurrent.{Ref, Semaphore}

import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors}

import fs2.Stream
import fs2.hash.md5
import fs2.io.file.{copy, deleteIfExists, exists, readAll, tempFileResource, writeAll}
import fs2.io.file.{exists, move, readAll, tempFileResource, writeAll}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand All @@ -37,64 +37,47 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

/**
* Functions responsible for periodic assets (such as MaxMind/IAB DBs) updates
* The common logic is to periodically invoke a function that:
* 1. Downloads a file (in background) to a temp location
* 2. Compares file's checksum with existing one (stored in a mutable hashmap)
* 3. If checksums match - delete the temp file, return
* 4. If checksums don't match - send a signal to stop raw stream
* (via `SignallingRef` in [[Environment]])
* 5. Once raw stream is stopped - delete an old file and move
* temp file to the old's file location
* If any of those URIs been updated and stopped the raw stream, it will be
* immediately resumed once the above procedure traversed all files
*/
/** Code in charge of downloading and updating the assets used by enrichments (e.g. MaxMind/IAB DBs). */
object Assets {

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

/**
* State of the [[updateStream]], containing information about tracked URIs
* and `stop` signal from [[Environment]] as well as all clients necessary
* to download URIs
*
* @param files mutable hash map of URIs and their latest known state
* @param pauseEnrich stop signal coming from [[Environment]] and that can be used
* to stop the raw stream consumption
* @param clients HTTP, GCS, S3 clients if necessary
* Contains information about downloaded content and the clients to download URIs.
* @param hashes Hash map of URIs and their latest known state (hash).
* @param clients Clients to download URIs.
*/
final case class State[F[_]](
files: Ref[F, Map[URI, Hash]],
pauseEnrich: Ref[F, Boolean],
hashes: Ref[F, Map[URI, Hash]],
clients: Clients[F]
)

object State {

/** Test pair is used in tests to initialize HTTP client, will be ignored during initialization */
/** Test pair used in tests to initialize HTTP client, will be ignored during initialization. */
private val TestPair: Asset = URI.create("http://localhost:8080") -> "index"

/**
* Initialize an assets state. Try to find them on local FS
* or download if they're missing. Also initializes all necessary
* clients (S3, GCP etc)
* @param blocker thread pool for downloading and reading files
* @param stop global stop signal from [[Environment]]
* @param assets all assets that have to be tracked
* @param http the HTTP client that we share with other parts of the application
* Initializes an assets state.
* Tries to find them on local FS and download them if they're missing.
* @param blocker Thread pool for downloading and reading files.
* @param sem Permit shared with the enriching, used while initializing the state.
* @param clients Clients to download the URIS.
* @param enrichments Configurations of the enrichments. Contains the list of assets.
*/
def make[F[_]: ConcurrentEffect: Timer: ContextShift](
blocker: Blocker,
stop: Ref[F, Boolean],
assets: List[Asset],
clients: Clients[F]
): Resource[F, State[F]] =
sem: Semaphore[F],
clients: Clients[F],
enrichments: Ref[F, Environment.Enrichments[F]]
): F[State[F]] =
for {
map <- Resource.eval(build[F](blocker, clients, assets.filterNot(asset => asset == TestPair)))
files <- Resource.eval(Ref.of[F, Map[URI, Hash]](map))
} yield State(files, stop, clients)
assets <- getAssetsList(enrichments)
map <- build[F](blocker, clients, assets.filterNot(asset => asset == TestPair))
hashes <- Ref.of[F, Map[URI, Hash]](map)
_ <- sem.release
} yield State(hashes, clients)

def build[F[_]: Concurrent: Timer: ContextShift](
blocker: Blocker,
Expand All @@ -108,15 +91,15 @@ object Assets {
case (uri, path, Some(hash)) =>
Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash)
case (uri, path, None) =>
downloadAndHash[F](clients, blocker, uri, Paths.get(path)).map(hash => uri -> hash)
downloadAndHash[F](blocker, clients, uri, Paths.get(path)).map(hash => uri -> hash)
}
}
.map(_.toMap)

def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] =
assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) }

/** Check if file already exists */
/** Checks if file already exists on filesystem. */
def local[F[_]: Sync: ContextShift](blocker: Blocker, path: String): F[Option[Hash]] = {
val fpath = Paths.get(path)
exists(blocker, fpath).ifM(
Expand All @@ -126,7 +109,7 @@ object Assets {
}
}

/** Valid MD5 hash */
/** MD5 hash. */
final case class Hash private (s: String) extends AnyVal

object Hash {
Expand All @@ -139,132 +122,123 @@ object Assets {
stream.through(md5).compile.to(Array).map(fromBytes)
}

/** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable) */
/** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable). */
type Asset = (URI, String)

/** Initialise the [[updateStream]] with all necessary resources if refresh period is specified */
def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel, A](env: Environment[F, A]): Stream[F, Unit] =
env.assetsUpdatePeriod match {
case Some(duration) =>
val init = for {
curDir <- getCurDir
_ <- Logger[F].info(s"Initializing assets refresh stream in $curDir, ticking every $duration")
assets <- env.enrichments.get.map(_.configs.flatMap(_.filesToCache))
} yield updateStream[F](env.blocker, env.assetsState, env.enrichments, curDir, duration, assets)
case class Downloaded(uri: URI, tpmPath: Path, finalPath: Path, hash: Hash)

/** Initializes the [[updateStream]] if refresh period is specified. */
def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel, A](
blocker: Blocker,
sem: Semaphore[F],
updatePeriod: Option[FiniteDuration],
assetsState: Assets.State[F],
enrichments: Ref[F, Environment.Enrichments[F]]
): Stream[F, Unit] =
updatePeriod match {
case Some(interval) =>
val log = Logger[F].info(show"Assets will be updated every $interval")
val init = log.map(_ => updateStream[F](blocker, sem, assetsState, enrichments, interval))
Stream.eval(init).flatten
case None =>
Stream.empty.covary[F]
}

def getCurDir[F[_]: Sync]: F[Path] =
Sync[F].delay(Paths.get("").toAbsolutePath)

/**
* At the end of every update, the stop signal will be resumed to `false`
* Create an update stream that ticks periodically and can invoke an update action,
* which will download an URI and check if it has been update. If it has the
* raw stream will be stopped via `stop` signal from [[Environment]] and assets updated
* Creates an update stream that periodically checks if new versions of assets are available.
* If that's the case, updates them locally for the enrichments and updates the state.
*/
def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer](
blocker: Blocker,
sem: Semaphore[F],
state: State[F],
enrichments: Ref[F, Environment.Enrichments[F]],
curDir: Path,
duration: FiniteDuration,
assets: List[Asset]
interval: FiniteDuration
): Stream[F, Unit] =
Stream.fixedDelay[F](duration).evalMap { _ =>
val log = Logger[F].debug(show"Checking remote assets: ${assets.map(_._1).mkString(", ")}")
val reinitialize: F[Unit] =
for {
// side-effecting get-set is inherently not thread-safe
// we need to be sure the state.stop is set to true
// before re-initializing enrichments
_ <- Logger[F].info("Resuming enrich stream")
old <- enrichments.get
_ <- Logger[F].info(show"Reinitializing enrichments: ${old.configs.map(_.schemaKey.name).mkString(", ")}")
fresh <- old.reinitialize(BlockerF.ofBlocker(blocker))
_ <- enrichments.set(fresh)
_ <- state.pauseEnrich.set(false)
} yield ()

val updated = downloadAndPause[F](blocker, state, curDir, assets)
log *> updated.ifM(reinitialize, Logger[F].debug("No assets have been updated since last check"))
Stream.fixedDelay[F](interval).evalMap { _ =>
for {
assets <- getAssetsList(enrichments)
_ <- Logger[F].info(show"Checking if following assets have been updated: ${assets.map(_._1).mkString(", ")}")
curDir <- getCurDir
downloaded <- downloadAll(blocker, curDir, state.clients, assets)
currentHashes <- state.hashes.get
newAssets = findUpdates(currentHashes, downloaded)
_ <-
if(newAssets.isEmpty)
Logger[F].info("All the assets are still the same, no update")
else
sem.withPermit {
update(blocker, state, enrichments, newAssets)
}
} yield ()
}

/**
* Download list of assets, return false if none has been downloaded
* It also can set `pauseEnrich` into `true` - a caller should make sure it's unpaused
*/
def downloadAndPause[F[_]: ConcurrentEffect: ContextShift: Timer](
/** Downloads all the assets, each into a temporary path.
* @return For each URI the temporary path and the hash of the file is returned,
* as well as the asset path on disk.
*/
def downloadAll[F[_]: ConcurrentEffect: ContextShift: Timer](
blocker: Blocker,
state: State[F],
dir: Path,
curDir: Path,
clients: Clients[F],
assets: List[Asset]
): F[Boolean] =
): F[List[Downloaded]] =
assets
.traverse {
case (uri, path) =>
update(blocker, state, dir, uri, Paths.get(path))
.traverse { a =>
tempFileResource[F](blocker, curDir).use { tmpPath =>
downloadAndHash(blocker, clients, a._1, tmpPath)
.map(hash => Downloaded(a._1, tmpPath, Paths.get(a._2), hash))
}
}
.map(_.contains(true))

/** Compares the hashes of downloaded assets with existing ones and keeps the different ones.
* @return List of assets that have been updated since last download.
*/
def findUpdates(
currentHashes: Map[URI, Hash],
downloaded: List[Downloaded]
): List[Downloaded] =
downloaded
.filterNot(a => currentHashes.get(a.uri).contains(a.hash))

/**
* Update a file in current directory if it has been updated on remote storage
* If a new file has been discovered - stops the enriching streams (signal in `state`)
* Do nothing if file hasn't been updated
*
* Note: this function has a potential to be thread-unsafe if download time
* exceeds tick period. We assume that no two threads will be downloading the same URI
*
* @param blocker a thread pool to execute download/copy operations
* @param state a map of URI to MD5 hash to keep track latest state of remote files
* @param curDir a local FS destination for temporary files
* @param uri a remote file (S3, GCS or HTTP), the URI is used as an identificator
* @param path a static file name that enrich clients will access
* file itself is placed in current dir (`dir`)
* @return true if file has been updated
*/
def update[F[_]: ConcurrentEffect: ContextShift: Timer](
* Performs all the updates after new version of at least an asset is available:
* 1. Replaces the existing file(s) on disk
* 2. Updates the state of the assets with new hash(es)
* 3. Updates the enrichments config
*/
def update[F[_]: ConcurrentEffect: ContextShift](
blocker: Blocker,
state: State[F],
curDir: Path,
uri: URI,
path: Path
): F[Boolean] =
tempFileResource[F](blocker, curDir).use { tmp =>
// Set stop signal and replace old file with temporary
def stopAndCopy(hash: Hash, delete: Boolean): F[Unit] =
for {
_ <- Logger[F].info(s"An asset at $uri has been updated since last check, pausing the enrich stream to reinitialize")
_ <- state.pauseEnrich.set(true)
_ <- if (delete) {
val deleted = Logger[F].info(s"Deleted outdated asset $path")
val notDeleted = Logger[F].warn(s"Couldn't delete $path, file didn't exist")
deleteIfExists(blocker, path).ifM(deleted, notDeleted)
} else Sync[F].unit
_ <- copy(blocker, tmp, path)
_ <- state.files.update(_.updated(uri, hash))
_ <- Logger[F].debug(s"Replaced $uri in Assets.State")
} yield ()
enrichments: Ref[F, Environment.Enrichments[F]],
newAssets: List[Downloaded]
): F[Unit] =
for {
_ <- newAssets.traverse_ { a =>
Logger[F].info(s"Remote ${a.uri} has changed, updating locally") *>
move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING))
}

for {
hash <- downloadAndHash(state.clients, blocker, uri, tmp)
localFiles <- state.files.get
updated <- localFiles.get(uri) match {
case Some(known) if known == hash =>
Sync[F].pure(false)
case Some(_) =>
stopAndCopy(hash, true).as(true)
case None =>
stopAndCopy(hash, false).as(true)
}
} yield updated
}
_ <- Logger[F].info("Refreshing the state of assets")
hashes <- state.hashes.get
updatedHashes = hashes ++ newAssets.map(a => (a.uri, a.hash)).toMap
_ <- state.hashes.set(updatedHashes)

_ <- Logger[F].info("Reinitializing enrichments")
old <- enrichments.get
fresh <- old.reinitialize(BlockerF.ofBlocker(blocker))
_ <- enrichments.set(fresh)
} yield ()

def getCurDir[F[_]: Sync]: F[Path] =
Sync[F].delay(Paths.get("").toAbsolutePath)

def getAssetsList[F[_]: Sync](enrichments: Ref[F, Environment.Enrichments[F]]): F[List[Asset]] =
enrichments.get.map(_.configs.flatMap(_.filesToCache))

def downloadAndHash[F[_]: Concurrent: ContextShift: Timer](
clients: Clients[F],
blocker: Blocker,
clients: Clients[F],
uri: URI,
destination: Path
): F[Hash] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ object Enrich {
/**
* Run a primary enrichment stream, reading from [[Environment]] source, enriching
* via [[enrichWith]] and sinking into the Good, Bad, and Pii sinks.
* Can be stopped via _stop signal_ from [[Environment]]
*
* The stream won't download any enrichment DBs, it is responsibility of [[Assets]]
* [[Assets.State.make]] downloads assets for the first time unconditionally during
Expand All @@ -80,13 +79,13 @@ object Enrich {
env.source.chunks
// .prefetch TODO
.evalTap(chunk => Logger[F].debug(s"Starting to process chunk of size ${chunk.size}"))
.pauseWhen(env.pauseEnrich)
.evalTap(chunk => env.metrics.rawCount(chunk.size))
.map(chunk => chunk.map(a => (a, env.getPayload(a))))
.evalMap(chunk =>
chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich)
env.semaphore.withPermit(
chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich)
)
)
// .evalMap(chunk => `env.pauser.withPermit(chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(64))!!)
.prefetch

val sinkAndCheckpoint: Pipe[F, List[(A, Result)], Unit] =
Expand Down
Loading

0 comments on commit 6dc77b5

Please sign in to comment.