Skip to content

Commit

Permalink
Merge pull request #1934 from fsharp/improve_parallel_runner
Browse files Browse the repository at this point in the history
make sure to start targets as early as possible when all dependencies…
  • Loading branch information
matthid authored May 17, 2018
2 parents baec147 + 0cba748 commit 2c369e3
Showing 1 changed file with 127 additions and 4 deletions.
131 changes: 127 additions & 4 deletions src/app/Fake.Core.Target/Target.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open System
open System.Collections.Generic
open Fake.Core
open Fake.Core.CommandLineParsing
open System.Threading.Tasks

module internal TargetCli =
let targetCli =
Expand Down Expand Up @@ -480,10 +481,131 @@ module Target =
let internal runSingleTarget (target : Target) (context:TargetContext) =
if not context.HasError then
use t = Trace.traceTarget target.Name (match target.Description with Some d -> d | _ -> "NoDescription") (dependencyString target)
runSimpleContextInternal target context
let res = runSimpleContextInternal target context
if res.HasError
then t.MarkFailed()
else t.MarkSuccess()
res
else
{ context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] }

module internal ParallelRunner =
let internal mergeContext (ctx1:TargetContext) (ctx2:TargetContext) =
let known =
ctx1.PreviousTargets
|> Seq.map (fun tres -> tres.Target.Name, tres)
|> dict
let filterKnown targets =
targets
|> List.filter (fun tres -> not (known.ContainsKey tres.Target.Name))
{ ctx1 with
PreviousTargets =
ctx1.PreviousTargets @ filterKnown ctx2.PreviousTargets
}
// Centralized handling of target context and next target logic...
type RunnerHelper =
| GetNextTarget of TargetContext * AsyncReplyChannel<TargetContext * Async<Target option>>
type IRunnerHelper =
abstract GetNextTarget : TargetContext -> Async<TargetContext * Async<Target option>>
let createCtxMgr (order:Target[] list) (ctx:TargetContext) =
let body (inbox:MailboxProcessor<RunnerHelper>) = async {
let targetCount =
order |> Seq.sumBy (fun t -> t.Length)
let resolution = Set.ofSeq(order |> Seq.concat |> Seq.map (fun t -> t.Name))
let inResolution (t:string) = resolution.Contains t
let mutable ctx = ctx
let mutable waitList = []
let mutable runningTasks = []
//let mutable remainingOrders = order
try
while true do
let! msg = inbox.Receive()
match msg with
| GetNextTarget (newCtx, reply) ->
// semantic is:
// - We never return a target twice!
// - we fill up the waitlist first
ctx <- mergeContext ctx newCtx
let known =
ctx.PreviousTargets
|> Seq.map (fun tres -> tres.Target.Name, tres)
|> dict
runningTasks <-
runningTasks
|> List.filter (fun t -> not(known.ContainsKey t.Name))
if known.Count = targetCount then
for (w:System.Threading.Tasks.TaskCompletionSource<Target option>) in waitList do
w.SetResult None
waitList <- []
reply.Reply (ctx, async.Return None)
else

let isRunnable (t:Target) =
not (known.ContainsKey t.Name) && // not already finised
not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running
t.Dependencies @ List.filter inResolution t.SoftDependencies // all dependencies finished
|> Seq.forall (fun d -> known.ContainsKey d)
let runnable =
order
|> Seq.concat
|> Seq.filter isRunnable
|> Seq.toList

let rec getNextFreeRunableTarget (r) =
match r with
| t :: rest ->
match waitList with
| h :: restwait ->
// fill some idle worker
runningTasks <- t :: runningTasks
h.SetResult (Some t)
waitList <- restwait
getNextFreeRunableTarget rest
| [] -> Some t
| [] -> None
match getNextFreeRunableTarget runnable with
| Some free ->
runningTasks <- free :: runningTasks
reply.Reply (ctx, async.Return(Some free))
| None ->
// queue work
let tcs = new TaskCompletionSource<Target option>()
waitList <- waitList @ [ tcs ]
reply.Reply (ctx, tcs.Task |> Async.AwaitTask)
with e ->
while true do
let! msg = inbox.Receive()
match msg with
| GetNextTarget (_, reply) ->
reply.Reply (ctx, async { return raise <| exn("mailbox failed", e) })
}

let mbox = MailboxProcessor.Start(body)
{ new IRunnerHelper with
member __.GetNextTarget (ctx) = mbox.PostAndAsyncReply(fun reply -> GetNextTarget(ctx, reply))
}

let runOptimal workerNum (order:Target[] list) targetContext =
let mgr = createCtxMgr order targetContext
let targetRunner () =
async {
let! (tctx, att) = mgr.GetNextTarget(targetContext)
let! tt = att
let mutable ctx = tctx
let mutable nextTarget = tt
while nextTarget.IsSome do
let newCtx = runSingleTarget nextTarget.Value ctx
let! (tctx, att) = mgr.GetNextTarget(newCtx)
let! tt = att
ctx <- tctx
nextTarget <- tt
return ctx
} |> Async.StartAsTask
Array.init workerNum (fun _ -> targetRunner())
|> Task.WhenAll
|> Async.AwaitTask
|> Async.RunSynchronously
|> Seq.reduce mergeContext

/// Runs the given array of targets in parallel using count tasks
let internal runTargetsParallel (count : int) (targets : Target[]) context =
Expand Down Expand Up @@ -530,9 +652,10 @@ module Target =
if parallelJobs > 1 && not singleTarget then
Trace.tracefn "Running parallel build with %d workers" parallelJobs

// run every level in parallel
order
|> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context
// always try to keep "parallelJobs" runners busy
ParallelRunner.runOptimal parallelJobs order context
//order
// |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context
else
let targets = order |> Seq.collect id |> Seq.toArray
let lastTarget = targets |> Array.last
Expand Down

0 comments on commit 2c369e3

Please sign in to comment.