From 0b735eb99132cddce327c5a82d1d96d2d0295187 Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 17 May 2018 12:27:28 +0200 Subject: [PATCH 1/3] make sure to start targets as early as possible when all dependencies are fullfilled --- src/app/Fake.Core.Target/Target.fs | 112 ++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 3 deletions(-) diff --git a/src/app/Fake.Core.Target/Target.fs b/src/app/Fake.Core.Target/Target.fs index 6ce5d6c2f77..cd03741950b 100644 --- a/src/app/Fake.Core.Target/Target.fs +++ b/src/app/Fake.Core.Target/Target.fs @@ -4,6 +4,7 @@ open System open System.Collections.Generic open Fake.Core open Fake.Core.CommandLineParsing +open System.Threading.Tasks module internal TargetCli = let targetCli = @@ -484,6 +485,110 @@ module Target = else { context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] } + module internal ParallelRunner = + let internal mergeContext (ctx1:TargetContext) (ctx2:TargetContext) = + let known = + ctx1.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + let filterKnown targets = + targets + |> List.filter (fun tres -> not (known.ContainsKey tres.Target.Name)) + { ctx1 with + PreviousTargets = + ctx1.PreviousTargets @ filterKnown ctx2.PreviousTargets + } + // Centralized handling of target context and next target logic... + type RunnerHelper = + | GetNextTarget of TargetContext * AsyncReplyChannel> + type IRunnerHelper = + abstract GetNextTarget : TargetContext -> Async> + let createCtxMgr (order:Target[] list) (ctx:TargetContext) = + let body (inbox:MailboxProcessor) = async { + let targetCount = + order |> Seq.sumBy (fun t -> t.Length) + let mutable ctx = ctx + let mutable waitList = [] + let mutable runningTasks = [] + //let mutable remainingOrders = order + while true do + let! msg = inbox.Receive() + match msg with + | GetNextTarget (newCtx, reply) -> + // semantic is: + // - We never return a target twice! + // - we fill up the waitlist first + ctx <- mergeContext ctx newCtx + let known = + ctx.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + runningTasks <- + runningTasks + |> List.filter (fun t -> not(known.ContainsKey t.Name)) + if known.Count = targetCount then + for (w:System.Threading.Tasks.TaskCompletionSource) in waitList do + w.SetResult None + waitList <- [] + reply.Reply (ctx, async.Return None) + else + let isRunnable (t:Target) = + not (known.ContainsKey t.Name) && // not already finised + not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running + t.Dependencies // all dependencies finished + |> Seq.forall (fun d -> known.ContainsKey d) + let runnable = + order + |> Seq.concat + |> Seq.filter isRunnable + |> Seq.toList + + let rec getNextFreeRunableTarget (r) = + match r with + | t :: rest -> + match waitList with + | h :: restwait -> + h.SetResult (Some t) + waitList <- restwait + getNextFreeRunableTarget rest + | [] -> Some t + | [] -> None + match getNextFreeRunableTarget runnable with + | Some free -> + reply.Reply (ctx, async.Return(Some free)) + | None -> + // queue work + let tcs = new TaskCompletionSource() + waitList <- waitList @ [ tcs ] + reply.Reply (ctx, tcs.Task |> Async.AwaitTask) + } + + let mbox = MailboxProcessor.Start(body) + { new IRunnerHelper with + member __.GetNextTarget (ctx) = mbox.PostAndAsyncReply(fun reply -> GetNextTarget(ctx, reply)) + } + + let runOptimal workerNum (order:Target[] list) targetContext = + let mgr = createCtxMgr order targetContext + let targetRunner () = + async { + let! (tctx, att) = mgr.GetNextTarget(targetContext) + let! tt = att + let mutable ctx = tctx + let mutable nextTarget = tt + while nextTarget.IsSome do + let newCtx = runSingleTarget nextTarget.Value ctx + let! (tctx, att) = mgr.GetNextTarget(newCtx) + let! tt = att + ctx <- tctx + nextTarget <- tt + return ctx + } |> Async.StartAsTask + Array.init workerNum (fun _ -> targetRunner()) + |> Task.WhenAll + |> Async.AwaitTask + |> Async.RunSynchronously + |> Seq.reduce mergeContext /// Runs the given array of targets in parallel using count tasks let internal runTargetsParallel (count : int) (targets : Target[]) context = @@ -530,9 +635,10 @@ module Target = if parallelJobs > 1 && not singleTarget then Trace.tracefn "Running parallel build with %d workers" parallelJobs - // run every level in parallel - order - |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context + // always try to keep "parallelJobs" runners busy + ParallelRunner.runOptimal parallelJobs order context + //order + // |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context else let targets = order |> Seq.collect id |> Seq.toArray let lastTarget = targets |> Array.last From 3898b50c68eb3b91fad4e22e52c95234651da47a Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 17 May 2018 14:10:41 +0200 Subject: [PATCH 2/3] fix https://github.com/fsharp/FAKE/issues/1929 and properly fill runningTasks to no execute tasks multiple times... --- src/app/Fake.Core.Target/Target.fs | 116 ++++++++++++++++------------- 1 file changed, 65 insertions(+), 51 deletions(-) diff --git a/src/app/Fake.Core.Target/Target.fs b/src/app/Fake.Core.Target/Target.fs index cd03741950b..4b36130afe7 100644 --- a/src/app/Fake.Core.Target/Target.fs +++ b/src/app/Fake.Core.Target/Target.fs @@ -481,7 +481,11 @@ module Target = let internal runSingleTarget (target : Target) (context:TargetContext) = if not context.HasError then use t = Trace.traceTarget target.Name (match target.Description with Some d -> d | _ -> "NoDescription") (dependencyString target) - runSimpleContextInternal target context + let res = runSimpleContextInternal target context + if res.HasError + then t.MarkFailed() + else t.MarkSuccess() + res else { context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] } @@ -511,56 +515,66 @@ module Target = let mutable waitList = [] let mutable runningTasks = [] //let mutable remainingOrders = order - while true do - let! msg = inbox.Receive() - match msg with - | GetNextTarget (newCtx, reply) -> - // semantic is: - // - We never return a target twice! - // - we fill up the waitlist first - ctx <- mergeContext ctx newCtx - let known = - ctx.PreviousTargets - |> Seq.map (fun tres -> tres.Target.Name, tres) - |> dict - runningTasks <- - runningTasks - |> List.filter (fun t -> not(known.ContainsKey t.Name)) - if known.Count = targetCount then - for (w:System.Threading.Tasks.TaskCompletionSource) in waitList do - w.SetResult None - waitList <- [] - reply.Reply (ctx, async.Return None) - else - let isRunnable (t:Target) = - not (known.ContainsKey t.Name) && // not already finised - not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running - t.Dependencies // all dependencies finished - |> Seq.forall (fun d -> known.ContainsKey d) - let runnable = - order - |> Seq.concat - |> Seq.filter isRunnable - |> Seq.toList - - let rec getNextFreeRunableTarget (r) = - match r with - | t :: rest -> - match waitList with - | h :: restwait -> - h.SetResult (Some t) - waitList <- restwait - getNextFreeRunableTarget rest - | [] -> Some t - | [] -> None - match getNextFreeRunableTarget runnable with - | Some free -> - reply.Reply (ctx, async.Return(Some free)) - | None -> - // queue work - let tcs = new TaskCompletionSource() - waitList <- waitList @ [ tcs ] - reply.Reply (ctx, tcs.Task |> Async.AwaitTask) + try + while true do + let! msg = inbox.Receive() + match msg with + | GetNextTarget (newCtx, reply) -> + // semantic is: + // - We never return a target twice! + // - we fill up the waitlist first + ctx <- mergeContext ctx newCtx + let known = + ctx.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + runningTasks <- + runningTasks + |> List.filter (fun t -> not(known.ContainsKey t.Name)) + if known.Count = targetCount then + for (w:System.Threading.Tasks.TaskCompletionSource) in waitList do + w.SetResult None + waitList <- [] + reply.Reply (ctx, async.Return None) + else + let isRunnable (t:Target) = + not (known.ContainsKey t.Name) && // not already finised + not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running + t.Dependencies // all dependencies finished + |> Seq.forall (fun d -> known.ContainsKey d) + let runnable = + order + |> Seq.concat + |> Seq.filter isRunnable + |> Seq.toList + + let rec getNextFreeRunableTarget (r) = + match r with + | t :: rest -> + match waitList with + | h :: restwait -> + // fill some idle worker + runningTasks <- t :: runningTasks + h.SetResult (Some t) + waitList <- restwait + getNextFreeRunableTarget rest + | [] -> Some t + | [] -> None + match getNextFreeRunableTarget runnable with + | Some free -> + runningTasks <- free :: runningTasks + reply.Reply (ctx, async.Return(Some free)) + | None -> + // queue work + let tcs = new TaskCompletionSource() + waitList <- waitList @ [ tcs ] + reply.Reply (ctx, tcs.Task |> Async.AwaitTask) + with e -> + while true do + let! msg = inbox.Receive() + match msg with + | GetNextTarget (_, reply) -> + reply.Reply (ctx, async { return raise <| exn("mailbox failed", e) }) } let mbox = MailboxProcessor.Start(body) From 0cba748797e5832bf67af6346fcc252cbe932b4f Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 17 May 2018 17:13:39 +0200 Subject: [PATCH 3/3] consider soft targets --- src/app/Fake.Core.Target/Target.fs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/app/Fake.Core.Target/Target.fs b/src/app/Fake.Core.Target/Target.fs index 4b36130afe7..1d6556e5ed2 100644 --- a/src/app/Fake.Core.Target/Target.fs +++ b/src/app/Fake.Core.Target/Target.fs @@ -511,6 +511,8 @@ module Target = let body (inbox:MailboxProcessor) = async { let targetCount = order |> Seq.sumBy (fun t -> t.Length) + let resolution = Set.ofSeq(order |> Seq.concat |> Seq.map (fun t -> t.Name)) + let inResolution (t:string) = resolution.Contains t let mutable ctx = ctx let mutable waitList = [] let mutable runningTasks = [] @@ -537,10 +539,11 @@ module Target = waitList <- [] reply.Reply (ctx, async.Return None) else + let isRunnable (t:Target) = not (known.ContainsKey t.Name) && // not already finised not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running - t.Dependencies // all dependencies finished + t.Dependencies @ List.filter inResolution t.SoftDependencies // all dependencies finished |> Seq.forall (fun d -> known.ContainsKey d) let runnable = order