diff --git a/src/app/Fake.Core.Target/Target.fs b/src/app/Fake.Core.Target/Target.fs index 6ce5d6c2f77..1d6556e5ed2 100644 --- a/src/app/Fake.Core.Target/Target.fs +++ b/src/app/Fake.Core.Target/Target.fs @@ -4,6 +4,7 @@ open System open System.Collections.Generic open Fake.Core open Fake.Core.CommandLineParsing +open System.Threading.Tasks module internal TargetCli = let targetCli = @@ -480,10 +481,131 @@ module Target = let internal runSingleTarget (target : Target) (context:TargetContext) = if not context.HasError then use t = Trace.traceTarget target.Name (match target.Description with Some d -> d | _ -> "NoDescription") (dependencyString target) - runSimpleContextInternal target context + let res = runSimpleContextInternal target context + if res.HasError + then t.MarkFailed() + else t.MarkSuccess() + res else { context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] } + module internal ParallelRunner = + let internal mergeContext (ctx1:TargetContext) (ctx2:TargetContext) = + let known = + ctx1.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + let filterKnown targets = + targets + |> List.filter (fun tres -> not (known.ContainsKey tres.Target.Name)) + { ctx1 with + PreviousTargets = + ctx1.PreviousTargets @ filterKnown ctx2.PreviousTargets + } + // Centralized handling of target context and next target logic... + type RunnerHelper = + | GetNextTarget of TargetContext * AsyncReplyChannel> + type IRunnerHelper = + abstract GetNextTarget : TargetContext -> Async> + let createCtxMgr (order:Target[] list) (ctx:TargetContext) = + let body (inbox:MailboxProcessor) = async { + let targetCount = + order |> Seq.sumBy (fun t -> t.Length) + let resolution = Set.ofSeq(order |> Seq.concat |> Seq.map (fun t -> t.Name)) + let inResolution (t:string) = resolution.Contains t + let mutable ctx = ctx + let mutable waitList = [] + let mutable runningTasks = [] + //let mutable remainingOrders = order + try + while true do + let! msg = inbox.Receive() + match msg with + | GetNextTarget (newCtx, reply) -> + // semantic is: + // - We never return a target twice! + // - we fill up the waitlist first + ctx <- mergeContext ctx newCtx + let known = + ctx.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + runningTasks <- + runningTasks + |> List.filter (fun t -> not(known.ContainsKey t.Name)) + if known.Count = targetCount then + for (w:System.Threading.Tasks.TaskCompletionSource) in waitList do + w.SetResult None + waitList <- [] + reply.Reply (ctx, async.Return None) + else + + let isRunnable (t:Target) = + not (known.ContainsKey t.Name) && // not already finised + not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running + t.Dependencies @ List.filter inResolution t.SoftDependencies // all dependencies finished + |> Seq.forall (fun d -> known.ContainsKey d) + let runnable = + order + |> Seq.concat + |> Seq.filter isRunnable + |> Seq.toList + + let rec getNextFreeRunableTarget (r) = + match r with + | t :: rest -> + match waitList with + | h :: restwait -> + // fill some idle worker + runningTasks <- t :: runningTasks + h.SetResult (Some t) + waitList <- restwait + getNextFreeRunableTarget rest + | [] -> Some t + | [] -> None + match getNextFreeRunableTarget runnable with + | Some free -> + runningTasks <- free :: runningTasks + reply.Reply (ctx, async.Return(Some free)) + | None -> + // queue work + let tcs = new TaskCompletionSource() + waitList <- waitList @ [ tcs ] + reply.Reply (ctx, tcs.Task |> Async.AwaitTask) + with e -> + while true do + let! msg = inbox.Receive() + match msg with + | GetNextTarget (_, reply) -> + reply.Reply (ctx, async { return raise <| exn("mailbox failed", e) }) + } + + let mbox = MailboxProcessor.Start(body) + { new IRunnerHelper with + member __.GetNextTarget (ctx) = mbox.PostAndAsyncReply(fun reply -> GetNextTarget(ctx, reply)) + } + + let runOptimal workerNum (order:Target[] list) targetContext = + let mgr = createCtxMgr order targetContext + let targetRunner () = + async { + let! (tctx, att) = mgr.GetNextTarget(targetContext) + let! tt = att + let mutable ctx = tctx + let mutable nextTarget = tt + while nextTarget.IsSome do + let newCtx = runSingleTarget nextTarget.Value ctx + let! (tctx, att) = mgr.GetNextTarget(newCtx) + let! tt = att + ctx <- tctx + nextTarget <- tt + return ctx + } |> Async.StartAsTask + Array.init workerNum (fun _ -> targetRunner()) + |> Task.WhenAll + |> Async.AwaitTask + |> Async.RunSynchronously + |> Seq.reduce mergeContext /// Runs the given array of targets in parallel using count tasks let internal runTargetsParallel (count : int) (targets : Target[]) context = @@ -530,9 +652,10 @@ module Target = if parallelJobs > 1 && not singleTarget then Trace.tracefn "Running parallel build with %d workers" parallelJobs - // run every level in parallel - order - |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context + // always try to keep "parallelJobs" runners busy + ParallelRunner.runOptimal parallelJobs order context + //order + // |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context else let targets = order |> Seq.collect id |> Seq.toArray let lastTarget = targets |> Array.last