Skip to content

Commit

Permalink
minor updates + scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelViveros committed Jul 29, 2017
1 parent 11a5faa commit cd9db41
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 52 deletions.
6 changes: 5 additions & 1 deletion data/src/main/scala/ch.epfl.scala.index.data/Main.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package ch.epfl.scala.index.data

import bintray.{BintrayDownloadPoms, BintrayListPoms, BintrayDownloadSbtPlugins}
import bintray.{
BintrayDownloadPoms,
BintrayListPoms,
BintrayDownloadSbtPlugins
}
import cleanup.{NonStandardLib, GithubRepoExtractor}
import elastic.SeedElasticSearch
import github.GithubDownload
Expand Down
3 changes: 2 additions & 1 deletion data/src/main/scala/ch.epfl.scala.index.data/SubIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ object SubIndex extends BintrayProtocol {
.map(bintray => write[BintraySearch](bintray))
.mkString(nl)

writeFile(destination.meta(LocalPomRepository.Bintray), filteredBintrayMeta)
writeFile(destination.meta(LocalPomRepository.Bintray),
filteredBintrayMeta)

def copyMetas(forRepo: LocalPomRepository): Unit = {
val shas = shasFor(forRepo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class BintrayListPoms(paths: DataPaths)(
page: PomListDownload): WSRequest = {
val query = page.lastSearchDate.fold(Seq[(String, String)]())(
after => Seq("created_after" -> (after.toLocalDateTime.toString + "Z"))
) ++ Seq("name" -> s"${page.query}*.pom", "start_pos" -> page.page.toString)
) ++ Seq("name" -> s"${page.query}*.pom",
"start_pos" -> page.page.toString)

withAuth(wsClient.url(s"$bintrayApi/search/file"))
.withQueryStringParameters(query: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import org.slf4j.LoggerFactory

trait PlayWsDownloader {

case class DownloadException(private val message: String = "",
private val cause: Throwable = None.orNull)
extends Exception(message, cause)

private val log = LoggerFactory.getLogger(getClass)

implicit val system: ActorSystem
Expand Down Expand Up @@ -194,10 +198,9 @@ trait PlayWsDownloader {
): Seq[R] = {

def processItems(client: AhcWSClient, progress: ProgressBar) = {
// use minimal concurrency to avoid abuse rate limit error which is triggered
// use minimal concurrency to avoid hitting abuse rate limit which is triggered
// by making too many calls in a short period of time, see https://github.com/scalacenter/scaladex/issues/431
val parallelism = 1
Source(toDownload).mapAsyncUnordered(parallelism) { item =>
Source(toDownload).mapAsyncUnordered(parallelism = 4) { item =>
processItem(client, item, progress)
}
}
Expand All @@ -223,10 +226,11 @@ trait PlayWsDownloader {
result.value
.map(_ match {
case Success(value) => value
case Failure(e) => {
case Failure(e: DownloadException) => {
log.warn(s"ERROR - $e")
Seq()
}
case Failure(e) => throw e
})
.getOrElse(Seq())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class SeedElasticSearch(paths: DataPaths)(implicit val ec: ExecutionContext)
val bulkResults = Await.result(esClient.execute {
bulk(
group.map(
release => indexInto(indexName / releasesCollection).source(release)
release =>
indexInto(indexName / releasesCollection).source(release)
)
)
}, Duration.Inf)
Expand All @@ -116,7 +117,8 @@ class SeedElasticSearch(paths: DataPaths)(implicit val ec: ExecutionContext)
val bulkResults = Await.result(esClient.execute {
bulk(
group.map(
project => indexInto(indexName / projectsCollection).source(project)
project =>
indexInto(indexName / projectsCollection).source(project)
)
)
}, Duration.Inf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import java.nio.file.{Files, Path}
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext.Implicits.global

class GithubDownload(paths: DataPaths,
privateCredentials: Option[Credentials] = None)(
implicit val system: ActorSystem,
Expand All @@ -40,6 +38,8 @@ class GithubDownload(paths: DataPaths,

private val log = LoggerFactory.getLogger(getClass)

import system.dispatcher

import Json4s._

case class PaginatedGithub(repo: GithubRepo, page: Int)
Expand All @@ -65,7 +65,9 @@ class GithubDownload(paths: DataPaths,
if (0 == creds.length) {
sys.error("Need at least 1 GitHub token, see CONTRIBUTING.md#GitHub")
} else if (2 < creds.length) {
sys.error("Can only use maximum of 2 GitHub tokens, see CONTRIBUTING.md#GitHub")
sys.error(
"Can only use maximum of 2 GitHub tokens, see CONTRIBUTING.md#GitHub"
)
} else {
creds
}
Expand Down Expand Up @@ -340,7 +342,8 @@ class GithubDownload(paths: DataPaths,
private def processIssuesResponse(project: Project,
response: WSResponse): Try[Unit] = {

checkGithubApiError(s"Processing Issues for ${project.githubRepo}", response)
checkGithubApiError(s"Processing Issues for ${project.githubRepo}",
response)
.map(_ => {
saveJson(githubRepoIssuesPath(paths, project.githubRepo),
project.githubRepo,
Expand All @@ -358,56 +361,52 @@ class GithubDownload(paths: DataPaths,

val rateLimitRemaining =
response.header("X-RateLimit-Remaining").getOrElse("-1").toInt
if (0 == rateLimitRemaining) {
pauseRateLimitReset()
if (200 == response.status && 100 > rateLimitRemaining) {
pauseRateLimitReset(response)
} else if (403 == response.status) {
val retryAfter = response.header("ResetAt").getOrElse("60").toInt
throw new Exception(
s" $message, hit Github API Abuse Rate Limit by making too many calls in a small amount of time, try again after $retryAfter s")
if (rateLimitRemaining == 0) {
val reset = new DateTime(
response.header("X-RateLimit-Reset").getOrElse("0").toLong * 1000
)
throw new DownloadException(
s" $message, Hit Github API Rate Limit by running out of API calls, try again when rate limits reset at $reset"
)

} else {
val retryAfter = response.header("ResetAt").getOrElse("60").toInt
throw new DownloadException(
s" $message, Hit Github API Abuse Rate Limit by making too many calls in a small amount of time, try again after $retryAfter s"
)
}
} else if (200 != response.status && 404 != response.status && 500 != response.status && 204 != response.status) {
// get 200 for valid response
// get 404 for old repo that no longer exists
// get 500 with 1 repo with community profile api (error on github's side), https://api.github.com/repos/spacelift/amqp-scala-client/community/profile
// get 204 when getting contributors for empty repo, https:/api.github.com/repos/rockjam/cbt-sonatype/contributors?page=1
throw new Exception(
throw new DownloadException(
s" $message, Unknown response from Github API, ${response.status}, ${response.body}"
)
}

}

private def pauseRateLimitReset() = {

// note: only have to compare REST API for both tokens since only hitting GraphQL API for topics
// so won't hit rate limit but if you hit GraphQL API for more things, you would
// need to check rate limit reset times for GraphQL API as well and pick max one

val client = wsClient
val baseRequest = client.url("https://api.github.com")
.addHttpHeaders("Accept" -> "application/json")
private def pauseRateLimitReset(response: WSResponse) = {

val request1 = baseRequest.addHttpHeaders("Authorization" -> s"bearer ${credential(0)}")
val response1 = Await.result(request1.get, Duration.Inf)
val reset1 = new DateTime(
response1.header("X-RateLimit-Reset").getOrElse("0").toLong * 1000)

val reset2 =
if (credential.length == 2) {
val request2 = baseRequest.addHttpHeaders("Authorization" -> s"bearer ${credential(1)}")
val response2 = Await.result(request2.get, Duration.Inf)
new DateTime(
response2.header("X-RateLimit-Reset").getOrElse("0").toLong * 1000)
} else {
reset1
}

client.close()
val reset = new DateTime(
response.header("X-RateLimit-Reset").getOrElse("0").toLong * 1000
)

val maxReset = if (reset1.compareTo(reset2) > 0) reset1 else reset2
val milliseconds = maxReset.getMillis - DateTime.now().getMillis
// wait for additional one minute buffer to avoid error where some threads resume too early
// and think rate limit hasn't reset and pause again
val oneMinBuffer = 60 * 1000
val milliseconds = reset.getMillis - DateTime
.now()
.getMillis + oneMinBuffer
val seconds = milliseconds / 1000
val minutes = seconds / 60
log.info(s"PAUSING, Hit Github API rate limit so pausing for $minutes mins to wait for rate limits for tokens to reset at $maxReset")
log.info(
s"PAUSING, About to hit Github API rate limit so pausing for $minutes mins to wait for rate limits for tokens to reset at $reset"
)
Thread.sleep(milliseconds)
log.info(s"RESUMING")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ class ProjectConvert(paths: DataPaths) extends BintrayProtocol {
github = github,
artifacts = releaseOptions.map(_.artifacts.sorted).getOrElse(Nil),
releaseCount = releaseCount,
defaultArtifact = releaseOptions.map(_.release.reference.artifact),
defaultArtifact =
releaseOptions.map(_.release.reference.artifact),
created = min,
updated = max
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ class DefaultReleaseTests extends FunSpec {
)

val result =
DefaultRelease(repository, ReleaseSelection.empty, releases, None, true)
DefaultRelease(repository,
ReleaseSelection.empty,
releases,
None,
true)

val versions: List[SemanticVersion] =
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class Github(implicit system: ActorSystem, materializer: ActorMaterializer)
.map(_.toList)
.map(_.collect { case (scala.util.Success(v), _) => v })
.flatMap(
s => Future.sequence(s.map(r2 => Unmarshal(r2).to[List[T]]))
s =>
Future.sequence(s.map(r2 => Unmarshal(r2).to[List[T]]))
)
.map(_.flatten)
} else Future.successful(Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ private[api] class PublishProcess(paths: DataPaths,
val path = data.tempPath.getParent

val downloadParentPomsStep =
new DownloadParentPoms(LocalPomRepository.MavenCentral, paths, Some(path))
new DownloadParentPoms(LocalPomRepository.MavenCentral,
paths,
Some(path))

downloadParentPomsStep.run()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ class RelevanceTest
private def compare(
params: SearchParams,
expected: List[(String, String)],
assertFun: (List[Project.Reference], List[Project.Reference]) => Assertion
assertFun: (List[Project.Reference],
List[Project.Reference]) => Assertion
): Future[Assertion] = {

val expectedRefs = expected.map {
Expand Down

0 comments on commit cd9db41

Please sign in to comment.