Skip to content

Commit

Permalink
Performance issue with large number of execution (#303)
Browse files Browse the repository at this point in the history
* Fix submission of large number of executions
* Fix IO in STM
  • Loading branch information
guillaumebort authored and vguerci committed Sep 26, 2018
1 parent 5772919 commit f8dfe6d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ lazy val examples =
.settings(
publishArtifact := false,
fork in Test := true,
connectInput in Test := true
connectInput in Test := true,
javaOptions ++= Seq("-Xmx256m", "-XX:+HeapDumpOnOutOfMemoryError"),
)
.settings(
Option(System.getProperty("generateExamples"))
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,23 @@ private[cuttle] object ExecutionStreams {
h.map(_._1)
}
maybeWriter.getOrElse {
val (w, toClose) = atomic { implicit tx =>
val w =
new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logFile(id), true), "utf8")))
val writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logFile(id), true), "utf8")))
val toClose: Seq[PrintWriter] = atomic { implicit tx =>
val toClose = if (openHandles.size > maxHandles) {
val toClear = openHandles.toSeq.sortBy(_._2._2).take(openHandles.size - maxHandles + 1).map(_._1)
toClear.map { id =>
val writerToClose = openHandles(id)._1
openHandles -= id
writerToClose
}
} else Nil
openHandles += (id -> (w -> now))
(w, toClose)
} else {
Nil
}
openHandles += (id -> (writer -> now))
toClose
}
toClose.foreach(_.close())
w
writer
}
}

Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -827,16 +827,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
case object Paused extends NewExecution
case class Throttled(launchDate: Instant) extends NewExecution

val index: Map[(Job[S], S#Context),(Execution[S], Future[Completed])] = runningState.single.map {
case (execution, future) =>
((execution.job, execution.context), (execution, future))
}.toMap
val existingOrNew
: Seq[Either[(Execution[S], Future[Completed]), (Job[S], Execution[S], Promise[Completed], NewExecution)]] =
atomic { implicit txn =>
if (isShuttingDown()) {
Seq.empty
} else
all.map {
case (job, context) =>
val maybeAlreadyRunning: Option[(Execution[S], Future[Completed])] =
runningState.find { case (e, _) => e.job == job && e.context == context }
all.distinct.zipWithIndex.map {
case ((job, context), i) =>
if(i > 1000 && i % 1000 == 0) logger.info(s"Submitted ${i}/${all.size} jobs")
val maybeAlreadyRunning: Option[(Execution[S], Future[Completed])] = index.get((job, context))

lazy val maybePaused: Option[(Execution[S], Future[Completed])] = pausedState
.get(job.id)
Expand Down

0 comments on commit f8dfe6d

Please sign in to comment.