From ad6334606794d44686ee36a05d38114284469dbc Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 22 Mar 2022 23:57:57 +0000 Subject: [PATCH] common: Recover from corrupt asset downloads (close #573) --- .../snowplow/enrich/common/fs2/Assets.scala | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala index 6946a9370..eb5dcb1d8 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Assets.scala @@ -76,22 +76,24 @@ object Assets { _ <- sem.release } yield State(hashes, clients) - def build[F[_]: Concurrent: Timer: ContextShift]( + def build[F[_]: ConcurrentEffect: Timer: ContextShift]( blocker: Blocker, clients: Clients[F], assets: List[Asset] ): F[Map[URI, Hash]] = - Logger[F].info("Initializing (downloading) enrichments assets") *> - buildFromLocal(blocker, assets) - .flatMap { hashes => - hashes.traverse { - case (uri, path, Some(hash)) => - Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash) - case (uri, path, None) => - downloadAndHash[F](blocker, clients, uri, Paths.get(path)).map(hash => uri -> hash) - } - } - .map(_.toMap) + for { + _ <- Logger[F].info("Initializing (downloading) enrichments assets") + curDir <- getCurDir + hashOpts <- buildFromLocal(blocker, assets) + hashes <- hashOpts.traverse { + case (uri, path, Some(hash)) => + Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash) + case (uri, path, None) => + download[F](blocker, curDir, clients, (uri, path)).use { a => + move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING)).as(uri -> a.hash) + } + } + } yield hashes.toMap def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] = assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) } @@ -189,13 +191,18 @@ object Assets { clients: Clients[F], assets: List[Asset] ): Resource[F, List[Downloaded]] = - assets - .traverse { a => - tempFileResource[F](blocker, dir).evalMap { tmpPath => - downloadAndHash(blocker, clients, a._1, tmpPath) - .map(hash => Downloaded(a._1, tmpPath, Paths.get(a._2), hash)) - } - } + assets.traverse(download(blocker, dir, clients, _)) + + def download[F[_]: ConcurrentEffect: ContextShift: Timer]( + blocker: Blocker, + dir: Path, + clients: Clients[F], + asset: Asset + ): Resource[F, Downloaded] = + tempFileResource[F](blocker, dir).evalMap { tmpPath => + downloadAndHash(blocker, clients, asset._1, tmpPath) + .map(hash => Downloaded(asset._1, tmpPath, Paths.get(asset._2), hash)) + } /** * Compares the hashes of downloaded assets with existing ones and keeps the different ones.