Skip to content

Commit

Permalink
Adds a way of getting ExecutionStream from threadName (#302)
Browse files Browse the repository at this point in the history
* [EXPERIMENTAL] register execution threads when running

* [EXPERIMENTAL] Example for slf4j/log4j redirection

* Updates and removes log4j specific code
  • Loading branch information
dufrannea authored Oct 5, 2018
1 parent 9c52a2d commit 73a0f12
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait ExecutionStreams {
/** Output debug messages (usually used by the [[ExecutionPlatform]]) */
def debug(str: CharSequence = ""): Unit = this.writeln("DEBUG", str)

private def writeln(tag: String, str: CharSequence): Unit = {
private[cuttle] def writeln(tag: String, str: CharSequence): Unit = {
val time = Instant.now.toString
str.toString.split("\n").foreach(l => this.writeln(s"$time $tag - $l"))
}
Expand Down
35 changes: 31 additions & 4 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.criteo.cuttle

import java.io.{PrintWriter, StringWriter}
import java.time.{Duration, Instant, ZoneId}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Timer, TimerTask}

Expand Down Expand Up @@ -376,6 +377,15 @@ private[cuttle] object ExecutionPlatform {
platforms.find(classTag[E].runtimeClass.isInstance).map(_.asInstanceOf[E])
}

private[cuttle] object Executor {

// we save a mapping of ThreadName -> ExecutionStreams to be able to redirect logs comming
// form different SideEffect (Futures) to the corresponding ExecutionStreams
private val threadNamesToStreams = new ConcurrentHashMap[String, ExecutionStreams]

def getStreams(threadName: String): Option[ExecutionStreams] = Option(threadNamesToStreams.get(threadName))
}

/** An [[Executor]] is responsible to actually execute the [[SideEffect]] functions for the
* given [[Execution Executions]]. */
class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPlatform],
Expand Down Expand Up @@ -860,16 +870,33 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
.toLeft {
val nextExecutionId = utils.randomUUID

val streams = new ExecutionStreams {
def writeln(str: CharSequence) = ExecutionStreams.writeln(nextExecutionId, str)
}

// wrap the execution context so that we can register the name of the thread of each
// runnable (and thus future) that will be run by the side effect.
val sideEffectExecutionContext = SideEffectThreadPool.wrap(runnable => new Runnable {
override def run(): Unit = {
val tName = Thread.currentThread().getName
Executor.threadNamesToStreams.put(tName, streams)
try {
runnable.run()
}
finally {
Executor.threadNamesToStreams.remove(tName)
}
}
})(Implicits.sideEffectThreadPool)

val execution = Execution(
id = nextExecutionId,
job,
context,
streams = new ExecutionStreams {
def writeln(str: CharSequence) = ExecutionStreams.writeln(nextExecutionId, str)
},
streams = streams,
platforms,
projectVersion
)
)(sideEffectExecutionContext)
val promise = Promise[Completed]

if (pausedState.contains(job.id)) {
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/ThreadPools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ object ThreadPools {
// dedicated threadpool to start new executions and run user-defined side effects
sealed trait SideEffectThreadPool extends WrappedThreadPool with Metrics

object SideEffectThreadPool {
def wrap(wrapRunnable: Runnable => Runnable)(implicit executionContext: SideEffectThreadPool): SideEffectThreadPool = {
new SideEffectThreadPool {
private val delegate = executionContext.underlying

override val underlying: ExecutionContext = new ExecutionContext {
override def execute(runnable: Runnable): Unit = delegate.execute(wrapRunnable(runnable))
override def reportFailure(cause: Throwable): Unit = delegate.reportFailure(cause)
}

}
}
}

// The implicitly provided execution contexts use fixed thread pools.
// These thread pool default sizes are overridable with Java system properties, passing -D<property_name> <value> flags when you start the JVM
object ThreadPoolSystemProperties extends Enumeration {
Expand Down

0 comments on commit 73a0f12

Please sign in to comment.