Skip to content

Commit

Permalink
common: Recover from corrupt asset downloads (close #573)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Mar 23, 2022
1 parent 8c04865 commit ad63346
Showing 1 changed file with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,24 @@ object Assets {
_ <- sem.release
} yield State(hashes, clients)

def build[F[_]: Concurrent: Timer: ContextShift](
def build[F[_]: ConcurrentEffect: Timer: ContextShift](
blocker: Blocker,
clients: Clients[F],
assets: List[Asset]
): F[Map[URI, Hash]] =
Logger[F].info("Initializing (downloading) enrichments assets") *>
buildFromLocal(blocker, assets)
.flatMap { hashes =>
hashes.traverse {
case (uri, path, Some(hash)) =>
Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash)
case (uri, path, None) =>
downloadAndHash[F](blocker, clients, uri, Paths.get(path)).map(hash => uri -> hash)
}
}
.map(_.toMap)
for {
_ <- Logger[F].info("Initializing (downloading) enrichments assets")
curDir <- getCurDir
hashOpts <- buildFromLocal(blocker, assets)
hashes <- hashOpts.traverse {
case (uri, path, Some(hash)) =>
Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash)
case (uri, path, None) =>
download[F](blocker, curDir, clients, (uri, path)).use { a =>
move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING)).as(uri -> a.hash)
}
}
} yield hashes.toMap

def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] =
assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) }
Expand Down Expand Up @@ -189,13 +191,18 @@ object Assets {
clients: Clients[F],
assets: List[Asset]
): Resource[F, List[Downloaded]] =
assets
.traverse { a =>
tempFileResource[F](blocker, dir).evalMap { tmpPath =>
downloadAndHash(blocker, clients, a._1, tmpPath)
.map(hash => Downloaded(a._1, tmpPath, Paths.get(a._2), hash))
}
}
assets.traverse(download(blocker, dir, clients, _))

def download[F[_]: ConcurrentEffect: ContextShift: Timer](
blocker: Blocker,
dir: Path,
clients: Clients[F],
asset: Asset
): Resource[F, Downloaded] =
tempFileResource[F](blocker, dir).evalMap { tmpPath =>
downloadAndHash(blocker, clients, asset._1, tmpPath)
.map(hash => Downloaded(asset._1, tmpPath, Paths.get(asset._2), hash))
}

/**
* Compares the hashes of downloaded assets with existing ones and keeps the different ones.
Expand Down

0 comments on commit ad63346

Please sign in to comment.