-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial structure and parallelization of parser #66
base: 2.13.x
Are you sure you want to change the base?
Initial structure and parallelization of parser #66
Conversation
8ddd732
to
d467b59
Compare
@@ -953,7 +996,9 @@ class Global(var currentSettings: Settings, reporter0: Reporter) | |||
* of what file was being compiled when it broke. Since I really | |||
* really want to know, this hack. | |||
*/ | |||
protected var lastSeenSourceFile: SourceFile = NoSourceFile | |||
protected var _lastSeenSourceFile: ThreadIdentityAwareThreadLocal[SourceFile] = ThreadIdentityAwareThreadLocal(NoSourceFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[this] final val ?
@@ -953,7 +996,9 @@ class Global(var currentSettings: Settings, reporter0: Reporter) | |||
* of what file was being compiled when it broke. Since I really | |||
* really want to know, this hack. | |||
*/ | |||
protected var lastSeenSourceFile: SourceFile = NoSourceFile | |||
protected var _lastSeenSourceFile: ThreadIdentityAwareThreadLocal[SourceFile] = ThreadIdentityAwareThreadLocal(NoSourceFile) | |||
@inline protected def lastSeenSourceFile: SourceFile = _lastSeenSourceFile.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will need to be final to @inline
Should be final anyway I think
The pattern is that we replace
<prot> var <name>: <type>
with
private[this] final _<name> = new ThreadIdentityAwareThreadLocal[<type>](...)
<prot> final def <name> : <type> = _<name>.get
<prot> final def <name>_=(newValue: <type>) = _<name>.set(newValue)
private var phasec: Int = 0 // phases completed | ||
private var unitc: Int = 0 // units completed this phase | ||
private var phasec: Int = 0 // phases completed | ||
private final val unitc: AtomicInteger = new AtomicInteger(0) // units completed this phase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that can be a Parallel.Counter
and AtomicInteger, with get, increment (or maybe +=) and reset
we can also add assertions on the access in the same way
// If we are on main thread it means there are no worker threads at all. | ||
// That in turn means we were already using main reporter all the time, so there is nothing more to do. | ||
// Otherwise we have to forward messages from worker thread reporter to main one. | ||
reporter match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't work - we need to report in order - that was why I had the local data for this
we should move this to a relay method on the StoreReporter I think, or a new RelayingReporter or some such
def apply[T](valueOnWorker: => T, valueOnMain: => T) = new ThreadIdentityAwareThreadLocal[T](valueOnWorker, valueOnMain) | ||
} | ||
|
||
// `ThreadIdentityAwareThreadLocal` allows us to have different (sub)type of values on main and worker threads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we have 3 modes
admin - running setup and cleardown on the main thread
parallel-worker
single-worker
It may be easier to have this single worker on a seperate single thread to keep it simple
The ThreadFactory used for the async executor can provide a subclass of Thread - ParallelWorkerThread - it makes this switch easier
@@ -0,0 +1,39 @@ | |||
package scala.tools.nsc.util | |||
|
|||
object ThreadIdentityAwareThreadLocal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a object Parallel, with all of the Parallel compilation extras in one place?
@@ -2535,7 +2538,9 @@ trait Trees { self: Universe => | |||
val treeCopy: TreeCopier = newLazyTreeCopier | |||
|
|||
/** The current owner symbol. */ | |||
protected[scala] var currentOwner: Symbol = rootMirror.RootClass | |||
protected[scala] def currentOwner: Symbol = _currentOwner.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make these final pls
@@ -27,7 +27,7 @@ trait Names extends api.Names { | |||
// detect performance regressions. | |||
// | |||
// Discussion: https://groups.google.com/forum/#!search/biased$20scala-internals/scala-internals/0cYB7SkJ-nM/47MLhsgw8jwJ | |||
protected def synchronizeNames: Boolean = false | |||
protected def synchronizeNames: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we do.
Umad reports violation in this class:
[WARN] Method accessed from multiple threads (main, scalac-parser-worker-1): scala.reflect.internal.SymbolTable.scala$reflect$internal$Names$$nc_$eq(int)
scala.reflect.internal.SymbolTable.scala$reflect$internal$Names$$nc_$eq(SymbolTable.scala)
scala.reflect.internal.Names.enterChars(Names.scala:78)
scala.reflect.internal.Names.body$1(Names.scala:116)
scala.reflect.internal.Names.newTermName(Names.scala:127)
scala.reflect.internal.Names.newTermName$(Names.scala:96)
scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:18)
scala.reflect.internal.Names.newTermName(Names.scala:83)
scala.reflect.internal.Names.newTermName$(Names.scala:82)
scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:18)
scala.reflect.internal.Names.newTermName(Names.scala:85)
What synchronizeNames
do is enabling synchronization on the newTermName
.
It looks like something designed exactly for our use case so I happily used it.
There were some concerns regarding enabling it globally though (mostly performance ones): https://groups.google.com/forum/#!search/biased$20scala-internals/scala-internals/0cYB7SkJ-nM/47MLhsgw8jwJ
@@ -79,6 +79,8 @@ import util.Position | |||
abstract class Reporter { | |||
protected def info0(pos: Position, msg: String, severity: Severity, force: Boolean): Unit | |||
|
|||
def infoRaw(pos: Position, msg: String, severity: Severity, force: Boolean): Unit = info0(pos, msg, severity, force) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this just for relay?
@@ -3116,8 +3116,10 @@ trait Symbols extends api.Symbols { self: SymbolTable => | |||
* type arguments. | |||
*/ | |||
override def tpe_* : Type = { | |||
maybeUpdateTypeCache() | |||
tpeCache | |||
this.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this safe - can we cross symbols with this lock?
I dont see how, but there are checks for cycles, so if there are it implies crossing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to discuss this.
Stack trace for the reference:
[WARN] Method accessed from multiple threads (scalac-parser-worker-3, scalac-parser-worker-2): scala.reflect.internal.Symbols$TypeSymbol.tpeCache_$eq(scala.reflect.internal.Types$Type)
scala.reflect.internal.Symbols$TypeSymbol.tpeCache_$eq(Symbols.scala)
scala.reflect.internal.Symbols$TypeSymbol.updateTypeCache(Symbols.scala:3155)
scala.reflect.internal.Symbols$TypeSymbol.maybeUpdateTypeCache(Symbols.scala:3143)
scala.reflect.internal.Symbols$TypeSymbol.tpe_$times(Symbols.scala:3120)
scala.reflect.internal.Symbols$Symbol.tpe(Symbols.scala:1463)
scala.tools.nsc.javac.JavaParsers$JavaParser.addAnnot$1(JavaParsers.scala:350)
scala.tools.nsc.javac.JavaParsers$JavaParser.modifiers(JavaParsers.scala:386)
scala.tools.nsc.javac.JavaParsers$JavaParser.typeBodyDecls(JavaParsers.scala:738)
scala.tools.nsc.javac.JavaParsers$JavaParser.typeBody(JavaParsers.scala:728)
scala.tools.nsc.javac.JavaParsers$JavaParser.classDecl(JavaParsers.scala:700)
scala.tools.nsc.javac.JavaParsers$JavaParser.$anonfun$typeDecl$3(JavaParsers.scala:868)
scala.tools.nsc.javac.JavaParsers$JavaParser$$Lambda$568/954745645.apply(Unknown Source)
scala.tools.nsc.javac.JavaParsers$JavaParser.joinComment(JavaParsers.scala:138)
scala.tools.nsc.javac.JavaParsers$JavaParser.typeDecl(JavaParsers.scala:868)
scala.tools.nsc.javac.JavaParsers$JavaParser.compilationUnit(JavaParsers.scala:916)
scala.tools.nsc.javac.JavaParsers$JavaParser.parse(JavaParsers.scala:49)
scala.tools.nsc.ast.parser.SyntaxAnalyzer.scala$tools$nsc$ast$parser$SyntaxAnalyzer$$initialUnitBody(SyntaxAnalyzer.scala:85)
scala.tools.nsc.ast.parser.SyntaxAnalyzer$ParserPhase.apply(SyntaxAnalyzer.scala:99)
scala.tools.nsc.Global$GlobalPhase.scala$tools$nsc$Global$GlobalPhase$$processUnit(Global.scala:443)
scala.tools.nsc.Global$GlobalPhase$$anonfun$1.$anonfun$applyOrElse$1(Global.scala:414)
scala.tools.nsc.Global$GlobalPhase$$anonfun$1$$Lambda$427/1917083500.apply(Unknown Source)
scala.concurrent.Future$.$anonfun$apply$1(Future.scala:608)
scala.concurrent.Future$$$Lambda$428/1526009370.apply(Unknown Source)
scala.util.Success.$anonfun$map$1(Try.scala:250)
scala.util.Success$$Lambda$431/987128726.apply(Unknown Source)
scala.util.Try$.apply(Try.scala:208)
scala.util.Success.map(Try.scala:250)
scala.concurrent.Future.$anonfun$map$1(Future.scala:244)
scala.concurrent.Future$$Lambda$429/1226711664.apply(Unknown Source)
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
scala.concurrent.impl.Promise$$Lambda$430/1319647571.apply(Unknown Source)
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Its looking pretty good I added some helper classes to https://github.com/rorygraves/scalac_perf/commits/mike/2.13.x_parallel_base in commit 2 that should ease some of the code I hope |
c4a8403
to
95cd8ed
Compare
95cd8ed
to
daa26ac
Compare
def reporter_=(newReporter: Reporter): Unit = | ||
currentReporter = newReporter match { | ||
currentReporter.set(newReporter match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert Parallel.onMainThread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't be true.
We are setting new one for every unit now.
It's required to ensure consistent ordering of the logs.
try { | ||
if ((unit ne null) && unit.exists) lastSeenSourceFile = unit.source | ||
currentRun.currentUnit = unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unluckily yes, at least as long as we have currentUnit
.
lastSeenSourceFile
is used inside error reporting which happens inside worker thread.
If we would make it global it could report errors on incorrect files.
That said I do not understand why we need it in first place.
Comment in Global.scala says:
/** There are common error conditions where when the exception hits
* here, currentRun.currentUnit is null. This robs us of the knowledge
* of what file was being compiled when it broke. Since I really
* really want to know, this hack.
*/
protected var lastSeenSourceFile: SourceFile = NoSourceFile
But i cannot see how lastSeenSourceFile
can be set when currentRun.currentUnit
is null.
@@ -66,11 +66,11 @@ abstract class GenBCode extends SubComponent { | |||
|
|||
def apply(unit: CompilationUnit): Unit = codeGen.genUnit(unit) | |||
|
|||
override def run(): Unit = { | |||
override def wrapRun(code: => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to change genBcode?
@@ -2463,7 +2464,9 @@ trait Trees { self: Universe => | |||
* @group Traversal | |||
*/ | |||
class Traverser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if all of these/some of these Traversers are thread local themselves.
Probably a bigger change thuogh and higher risk
var pos: Position | ||
private val _pos: WorkerThreadLocal[Position] = WorkerThreadLocal(NoPosition) | ||
@inline def pos: Position = _pos.get | ||
@inline def pos_=(position: Position): Unit = _pos.set(position) | ||
} | ||
protected[this] lazy val posAssigner: PosAssigner = new DefaultPosAssigner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not make this a thread local. BTW it has overrides
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it will require more changes exactly because of those overrides but when i tried now it turns out it's 3-lines-change. Fixed.
@@ -78,6 +78,8 @@ import util.Position | |||
*/ | |||
abstract class Reporter { | |||
protected def info0(pos: Position, msg: String, severity: Severity, force: Boolean): Unit | |||
def info1(pos: Position, msg: String, severity: Severity, force: Boolean): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With current design i still cannot access info0
.
But I can make this protected[reporters]
.
|
||
object Parallel { | ||
|
||
class WorkerThread(group: ThreadGroup, target: Runnable, name: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a SingleThreadedWorker as dicussed on call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running in the current thread is handled inside execution context in Global
in the else
block:
private def createExecutionContext(): ExecutionContextExecutor = {
if (settings.YparallelPhases.containsPhase(this)) {
val parallelThreads = settings.YparallelThreads.value
val threadPoolFactory = ThreadPoolFactory(Global.this, this)
val javaExecutor = threadPoolFactory.newUnboundedQueueFixedThreadPool(parallelThreads, "worker")
ExecutionContext.fromExecutorService(javaExecutor, _ => ())
} else ExecutionContext.fromExecutor((task: Runnable) => asWorkerThread(task.run()))
}
|
||
if (isDebugPrintEnabled) inform("[running phase " + name + " on " + currentRun.size + " compilation units]") | ||
|
||
implicit val ec: ExecutionContextExecutor = createExecutionContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why can't reuse the same executionContext across all phases - to create a treadpull is not a cheap operation IIRC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I don't see why we cant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we cant - its wiring requires that we generate for each phase. The reuse or otherwise has to be managed by the async helper to get the instrumentation
def reporter_=(newReporter: Reporter): Unit = | ||
currentReporter = newReporter match { | ||
currentReporter.set(newReporter match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should assert that we are on the main thread I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not the case here, please look into processUnit
method.
currentRun.informUnitStarting(this, unit) | ||
try withCurrentUnitNoLog(unit)(task) | ||
finally currentRun.advanceUnit() | ||
def run(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep the methods in the same order as original
currentRun.currentUnit = unit | ||
task | ||
_synchronizeNames = isParallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we changing this on a worker thread?
currentRun.currentUnit = unit0 | ||
currentRun.advanceUnit() | ||
_synchronizeNames = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
races!!!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah... you are right!
First thing i need to do at Monday is remove all excludes from my tool and replace them with some intelligent check for synchronization etc.
@@ -1058,12 +1129,6 @@ class Global(var currentSettings: Settings, reporter0: Reporter) | |||
*/ | |||
override def currentRunId = curRunId | |||
|
|||
def echoPhaseSummary(ph: Phase) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we gain by changing this method. We need to keep the focus on the chnage, not optimisations and refactors that are orthoganal IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's related to some extend.
I changed typer
's run
method to call super, previously it was re-implementing it.
echoPhaseSummary
was one of the methods which are no longer needed when we are not re-implementing code from the run.
/** Access from multiple threads was reported by umad. | ||
* That possibly could be solved by ensuring that every unit operates on it's own copy of the tree, | ||
* but it would require much bigger refactorings and would be more memory consuming. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a traverser captures the currentUnit, or thread, and asserts that it is local
Maybe this is a followup investigation though
/** The current owner symbol. | ||
* | ||
* Access from multiple threads was reported by umad. | ||
* That possibly could be solved by ensuring that every unit operates on it's own copy of the tree, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a transformer captures the currentUnit, or thread, and asserts that it is local
Maybe this is a followup investigation though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good - I don't really have any comments above and beyond Mikes
// Runs block of the code in the 'worker thread' mode | ||
// All unit processing should always happen in the worker thread | ||
@inline final def asWorkerThread[T](fn: => T): T = { | ||
val previous = isWorker.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this to be reenterent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not so much about being re-entrant as for cases when we have:
- main execution
- worker execution
- main execution
happening on the same thread (tests, phases with disabled parallel compilation)
// it's much easier to start with assuming everything is initially worker thread | ||
// and just mark main accordingly when needed. | ||
private val isWorker: ThreadLocal[Boolean] = new ThreadLocal[Boolean] { | ||
override def initialValue(): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not convinced that this is a safe assumption
would it be safer to have ismanaged as a threadlocal (default false) and set to true while in global.compileUnits
this could be a enabler for the other checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if/why that would be safer.
I guess we need to discuss that.
|
||
// Asserts that current execution happens on the main thread | ||
def assertOnMain(): Unit = { | ||
assert(!isWorker.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!ismanaged.get() &&
??
} | ||
|
||
// Wrapper for `synchronized` method. In future could provide additional logging, safety checks, etc. | ||
def synchronizeAccess[T <: Object, U](obj: T)(block: => U): U = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
class ThreadStats{
//may be useful to log from ProfilingThreadPoolFactory - ideally to perf_tester, but maybe just to console at the moment
var uncontendedAccess = 0L
var contendedAccess =0L
var waitTime = 0L
}
object ThreadStats extends ThreadLocal[ThreadStats] {
override def initialValue(): ThreadStats = new ThreadStats
}
final class ControlledAccess(name: String) {
private val lock = new ReentrantLock(false)
def apply[T] (block: => T) = {
val stats = ThreadStats.get
if (lock.tryLock())
stats.uncontendedAccess += 1
else {
val startWait = System.nanoTime()
lock.lock()
val duration = System.nanoTime() - startWait
stats.waitTime += duration
stats.contendedAccess += 1
}
try block finally lock.unlock
}
}
Looks very close 👍 |
Results from perf_tester:
Parser phase comparison:
Total comparison:
This is with
rangepos
enabled.