diff --git a/build.sbt b/build.sbt index 21bbccb29..b4a171ae9 100644 --- a/build.sbt +++ b/build.sbt @@ -253,7 +253,8 @@ lazy val examples = .settings( publishArtifact := false, fork in Test := true, - connectInput in Test := true + connectInput in Test := true, + javaOptions ++= Seq("-Xmx256m", "-XX:+HeapDumpOnOutOfMemoryError"), ) .settings( Option(System.getProperty("generateExamples")) diff --git a/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala b/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala index 556f7f955..533d70dda 100644 --- a/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala +++ b/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala @@ -58,9 +58,8 @@ private[cuttle] object ExecutionStreams { h.map(_._1) } maybeWriter.getOrElse { - val (w, toClose) = atomic { implicit tx => - val w = - new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logFile(id), true), "utf8"))) + val writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logFile(id), true), "utf8"))) + val toClose: Seq[PrintWriter] = atomic { implicit tx => val toClose = if (openHandles.size > maxHandles) { val toClear = openHandles.toSeq.sortBy(_._2._2).take(openHandles.size - maxHandles + 1).map(_._1) toClear.map { id => @@ -68,12 +67,14 @@ private[cuttle] object ExecutionStreams { openHandles -= id writerToClose } - } else Nil - openHandles += (id -> (w -> now)) - (w, toClose) + } else { + Nil + } + openHandles += (id -> (writer -> now)) + toClose } toClose.foreach(_.close()) - w + writer } } diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 55d1ffd07..721469336 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -802,16 +802,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla case object Paused extends NewExecution case class Throttled(launchDate: Instant) extends NewExecution + val index: Map[(Job[S], S#Context),(Execution[S], Future[Completed])] = runningState.single.map { + case (execution, future) => + ((execution.job, execution.context), (execution, future)) + }.toMap val existingOrNew : Seq[Either[(Execution[S], Future[Completed]), (Job[S], Execution[S], Promise[Completed], NewExecution)]] = atomic { implicit txn => if (isShuttingDown()) { Seq.empty } else - all.map { - case (job, context) => - val maybeAlreadyRunning: Option[(Execution[S], Future[Completed])] = - runningState.find { case (e, _) => e.job == job && e.context == context } + all.distinct.zipWithIndex.map { + case ((job, context), i) => + if(i > 1000 && i % 1000 == 0) logger.info(s"Submitted ${i}/${all.size} jobs") + val maybeAlreadyRunning: Option[(Execution[S], Future[Completed])] = index.get((job, context)) lazy val maybePaused: Option[(Execution[S], Future[Completed])] = pausedState .get(job.id)