Skip to content

Commit

Permalink
#1156 Fix analyzer timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Apr 6, 2020
1 parent 2e28aa2 commit 9bc3609
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,22 @@ class CortexAnalyzerSrv @Inject()(
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(jobId, jobFields)
case _ if maxRetryOnError > 0
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
/* Workaround */
case CortexError(500, _, body) if Try((Json.parse(body) \ "type").as[String]) == Success("akka.pattern.AskTimeoutException")
logger.debug("Got a 500 Timeout, retry")
updateJobWithCortex(jobId, cortexJobId, cortex)
case e if maxRetryOnError > 0
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...", e)
val result = Promise[Job]
system.scheduler.scheduleOnce(retryDelay) {
updateJobWithCortex(jobId, cortexJobId, cortex, retryDelay, maxRetryOnError - 1).onComplete(result.complete)
}
result.future
case _
logger.error(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails and the number of errors reaches the limit, aborting")
case e
logger.error(
s"Request of status of job $cortexJobId in cortex ${cortex.name} fails and the number of errors reaches the limit, aborting",
e
)
update(
jobId,
Fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ class CortexClient(val name: String, baseUrl: String, authentication: Option[Cor
request(s"api/analyzer/type/$dataType", _.get, _.json.as[Seq[Analyzer]]).map(_.map(_.copy(cortexIds = List(name))))

def waitReport(jobId: String, atMost: Duration)(implicit ec: ExecutionContext): Future[JsObject] =
request(s"api/job/$jobId/waitreport", _.withQueryStringParameters("atMost" atMost.toString).get, _.json.as[JsObject])
request(
s"api/job/$jobId/waitreport",
_.withQueryStringParameters("atMost" atMost.toString).withRequestTimeout(atMost + 1.second).get,
_.json.as[JsObject]
)

def getVersion()(implicit ec: ExecutionContext): Future[Option[String]] =
request("api/status", _.get, identity)
Expand Down

0 comments on commit 9bc3609

Please sign in to comment.