From 083e17a43f2f68a2fc57ae2328733db35857c8d0 Mon Sep 17 00:00:00 2001 From: Guillaume Bort Date: Thu, 27 Jul 2017 10:57:59 +0200 Subject: [PATCH] Add `execAndRetrieveOutput` to fork a process and retrieve its output. (#133) --- build.sbt | 2 +- .../platforms/local/LocalPlatform.scala | 24 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 13ce3cd37..ea8e55af1 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.") val writeClasspath = taskKey[File]("Write the project classpath to a file.") -val VERSION = "0.1.7" +val VERSION = "0.1.8" lazy val commonSettings = Seq( organization := "com.criteo.cuttle", diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala index 03e5ba851..28b8ae3d2 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala @@ -28,7 +28,11 @@ object LocalPlatform { class LocalProcess(command: String) { val id = UUID.randomUUID().toString - def exec[S <: Scheduling](env: Map[String, String] = sys.env)(implicit execution: Execution[S]): Future[Unit] = { + private def exec0[S <: Scheduling]( + env: Map[String, String] = sys.env, + outLogger: (String) => Unit, + errLogger: (String) => Unit + )(implicit execution: Execution[S]): Future[Unit] = { val streams = execution.streams streams.debug(s"Forking:") streams.debug(this.toString) @@ -43,12 +47,16 @@ class LocalProcess(command: String) { override def onStdout(buffer: ByteBuffer, closed: Boolean) = { val bytes = Array.ofDim[Byte](buffer.remaining) buffer.get(bytes) - streams.info(new String(bytes)) + val str = new String(bytes) + streams.info(str) + outLogger(str) } override def onStderr(buffer: ByteBuffer, closed: Boolean) = { val bytes = Array.ofDim[Byte](buffer.remaining) buffer.get(bytes) - streams.error(new String(bytes)) + val str = new String(bytes) + streams.error(str) + errLogger(str) } override def onExit(statusCode: Int) = statusCode match { @@ -68,5 +76,15 @@ class LocalProcess(command: String) { result.future } } + + def exec[S <: Scheduling](env: Map[String, String] = sys.env)(implicit execution: Execution[S]): Future[Unit] = + exec0(env, _ => (), _ => ()) + + def execAndRetrieveOutput[S <: Scheduling](env: Map[String, String] = sys.env)(implicit execution: Execution[S]): Future[(String,String)] = { + val out = new StringBuffer + val err = new StringBuffer + exec0(env, x => out.append(x), x => err.append(x)).map(_ => (out.toString, err.toString)) + } + override def toString = command }