From 5af3aa3a54a1b824184bb14b2a9a35cba71ac80b Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 25 May 2017 19:28:05 +0200 Subject: [PATCH 01/15] try to improve error handling in nuget and do only a maximum of 7 requests in parallel. --- src/Paket.Core/Common/Profile.fs | 4 + src/Paket.Core/Dependencies/NuGetV2.fs | 57 +++--- .../Dependencies/PackageResolver.fs | 183 +++++++++++++----- src/Paket/Program.fs | 25 +-- 4 files changed, 182 insertions(+), 87 deletions(-) diff --git a/src/Paket.Core/Common/Profile.fs b/src/Paket.Core/Common/Profile.fs index 64b343e356..aeed39654f 100644 --- a/src/Paket.Core/Common/Profile.fs +++ b/src/Paket.Core/Common/Profile.fs @@ -10,6 +10,7 @@ type BlockReason = type Category = | ResolverAlgorithm | ResolverAlgorithmBlocked of BlockReason + | ResolverAlgorithmNotBlocked of BlockReason | NuGetRequest | NuGetDownload | FileIO @@ -18,6 +19,9 @@ type Event = { Category: Category; Duration : TimeSpan } let events = System.Collections.Concurrent.ConcurrentBag() +let trackEvent cat = + events.Add({ Category = cat; Duration = TimeSpan() }) + let startCategory cat = let cw = Stopwatch.StartNew() let mutable wasDisposed = false diff --git a/src/Paket.Core/Dependencies/NuGetV2.fs b/src/Paket.Core/Dependencies/NuGetV2.fs index 8446b99fd0..c45c4d9b88 100644 --- a/src/Paket.Core/Dependencies/NuGetV2.fs +++ b/src/Paket.Core/Dependencies/NuGetV2.fs @@ -18,6 +18,7 @@ open Paket.Xml open Paket.PackageSources open Paket.Requirements open FSharp.Polyfill +open System.Runtime.ExceptionServices let rec private followODataLink auth url = async { @@ -648,7 +649,7 @@ let rec private getPackageDetails alternativeProjectRoot root force (sources:Pac nugetSource.Url packageName version - return Some(source,result) } + return Choice1Of2(source,result) } let tryV3 source nugetSource force = async { if nugetSource.Url.Contains("myget.org") || nugetSource.Url.Contains("nuget.org") || nugetSource.Url.Contains("visualstudio.com") || nugetSource.Url.Contains("/nuget/v3/") then @@ -661,25 +662,25 @@ let rec private getPackageDetails alternativeProjectRoot root force (sources:Pac url packageName version - return Some(source,result) + return Choice1Of2(source,result) | _ -> let! result = NuGetV3.GetPackageDetails force nugetSource packageName version - return Some(source,result) + return Choice1Of2(source,result) else let! result = NuGetV3.GetPackageDetails force nugetSource packageName version - return Some(source,result) } + return Choice1Of2(source,result) } let getPackageDetails force = // helper to work through the list sequentially - let rec trySelectFirst workLeft = + let rec trySelectFirst errors workLeft = async { match workLeft with | work :: rest -> let! r = work match r with - | Some result -> return Some result - | None -> return! trySelectFirst rest - | [] -> return None + | Choice1Of2 result -> return Choice1Of2 result + | Choice2Of2 error -> return! trySelectFirst (error::errors) rest + | [] -> return Choice2Of2 errors } sources |> List.sortBy (fun source -> @@ -717,33 +718,39 @@ let rec private getPackageDetails alternativeProjectRoot root force (sources:Pac | LocalNuGet(path,Some _) -> let! result = getDetailsFromLocalNuGetPackage true alternativeProjectRoot root path packageName version - return Some(source,result) + return Choice1Of2(source,result) | LocalNuGet(path,None) -> let! result = getDetailsFromLocalNuGetPackage false alternativeProjectRoot root path packageName version - return Some(source,result) + return Choice1Of2(source,result) with e -> if verbose then verbosefn "Source '%O' exception: %O" source e - return None }) - |> trySelectFirst + let capture = ExceptionDispatchInfo.Capture e + return Choice2Of2 capture }) + |> trySelectFirst [] let! maybePackageDetails = getPackageDetails force let! source,nugetObject = async { + let fallback () = + match sources |> List.map (fun (s:PackageSource) -> s.ToString()) with + | [source] -> + failwithf "Couldn't get package details for package %O %O on %O." packageName version source + | [] -> + failwithf "Couldn't get package details for package %O %O, because no sources were specified." packageName version + | sources -> + failwithf "Couldn't get package details for package %O %O on any of %A." packageName version sources + match maybePackageDetails with - | None -> - let! m = getPackageDetails true - match m with - | None -> - match sources |> List.map (fun (s:PackageSource) -> s.ToString()) with - | [source] -> - return failwithf "Couldn't get package details for package %O %O on %O." packageName version source - | [] -> - return failwithf "Couldn't get package details for package %O %O, because no sources were specified." packageName version - | sources -> - return failwithf "Couldn't get package details for package %O %O on any of %A." packageName version sources - | Some packageDetails -> return packageDetails - | Some packageDetails -> return packageDetails + | Choice2Of2 ([]) -> return fallback() + | Choice2Of2 (h::restError) -> + for error in restError do + if not verbose then + // Otherwise the error was already mentioned above + traceWarnfn "Ignoring: %s" error.Message + h.Throw() + return fallback() + | Choice1Of2 packageDetails -> return packageDetails } let encodeURL (url:string) = diff --git a/src/Paket.Core/Dependencies/PackageResolver.fs b/src/Paket.Core/Dependencies/PackageResolver.fs index c55545ce98..fb2195e512 100644 --- a/src/Paket.Core/Dependencies/PackageResolver.fs +++ b/src/Paket.Core/Dependencies/PackageResolver.fs @@ -9,6 +9,8 @@ open System.Collections.Generic open System open System.Diagnostics open Paket.PackageSources +open System.Threading.Tasks +open System.Threading type DependencySet = Set @@ -641,35 +643,110 @@ type private Stage = | Outer of currentConflict : (ConflictState * ResolverStep * PackageRequirement) * priorConflictSteps : (ConflictState * ResolverStep * PackageRequirement * seq * StepFlags) list | Inner of currentConflict : (ConflictState * ResolverStep * PackageRequirement) * priorConflictSteps : (ConflictState * ResolverStep * PackageRequirement * seq * StepFlags) list +type WorkPriority = + | BackgroundWork = 10 + | BlockingWork = 1 + +type RequestWork = + private + { StartWork : unit -> System.Threading.Tasks.Task + mutable Priority : WorkPriority } + +type WorkHandle<'a> = private { Handle : Guid; Work : RequestWork; TaskSource : TaskCompletionSource<'a> } +and ResolverRequestQueue = + private { DynamicQueue : System.Collections.Concurrent.ConcurrentDictionary } +module ResolverRequestQueue = + open System.Threading + + let Create() = { DynamicQueue = new System.Collections.Concurrent.ConcurrentDictionary(); } + let addWork prio (f: unit -> System.Threading.Tasks.Task<'a>) ({ DynamicQueue = queue } as q) = + let tcs = new TaskCompletionSource<_>() + let work = + { StartWork = (fun () -> + f().ContinueWith(fun (t:System.Threading.Tasks.Task<'a>) -> + if t.IsCanceled then + tcs.SetException(new TaskCanceledException(t)) + elif t.IsFaulted then + tcs.SetException(t.Exception) + else tcs.SetResult (t.Result))) + Priority = prio } + let handle = Guid.NewGuid() + let res = queue.TryAdd(handle, work) + assert res + { Handle = handle; Work = work; TaskSource = tcs } + let rec private getNext ({ DynamicQueue = queue } as d) = + if queue.Count = 0 then None + else + let min = queue |> Seq.minBy (fun kv -> kv.Value.Priority) + match queue.TryRemove(min.Key) with + | true, min -> + Some min + | _ -> + getNext d + let startProcessing (cts:CancellationToken) ({ DynamicQueue = queue } as q) = + async { + while not cts.IsCancellationRequested do + match getNext q with + | None -> do! Async.Sleep 10 + | Some work -> + do! work.StartWork().ContinueWith(fun (t:System.Threading.Tasks.Task) ->()) |> Async.AwaitTask + } + |> Async.StartAsTask + +type WorkHandle<'a> with + member x.Reprioritize prio = + let { Work = work } = x + work.Priority <- prio + member x.Task = + let { TaskSource = task } = x + task.Task + /// Resolves all direct and transitive dependencies let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, groupName:GroupName, globalStrategyForDirectDependencies, globalStrategyForTransitives, globalFrameworkRestrictions, (rootDependencies:PackageRequirement Set), updateMode : UpdateMode) = tracefn "Resolving packages for group %O:" groupName use d = Profile.startCategory Profile.Category.ResolverAlgorithm + use cts = new CancellationTokenSource() + let workerQueue = ResolverRequestQueue.Create() + let workers = + // start maximal 7 requests at the same time. + [ 0 .. 7 ] + |> Seq.map (fun _ -> ResolverRequestQueue.startProcessing cts.Token workerQueue) + + let getAndReport blockReason (workHandle:WorkHandle<_>) = + if workHandle.Task.IsCompleted then + Profile.trackEvent (Profile.Category.ResolverAlgorithmNotBlocked blockReason) + workHandle.Task.Result + else + workHandle.Reprioritize WorkPriority.BlockingWork + use d = Profile.startCategory (Profile.Category.ResolverAlgorithmBlocked blockReason) + let result = workHandle.Task.Result + d.Dispose() + result - let startedGetPackageDetailsRequests = System.Collections.Concurrent.ConcurrentDictionary<_,System.Threading.Tasks.Task<_>>() + let startedGetPackageDetailsRequests = System.Collections.Concurrent.ConcurrentDictionary<_,WorkHandle<_>>() let startRequestGetPackageDetails sources groupName packageName semVer = let key = (sources, packageName, semVer) startedGetPackageDetailsRequests.GetOrAdd (key, fun _ -> - (getPackageDetailsRaw sources groupName packageName semVer : Async) - |> Async.StartAsTask) + workerQueue + |> ResolverRequestQueue.addWork WorkPriority.BackgroundWork (fun () -> + (getPackageDetailsRaw sources groupName packageName semVer : Async) + |> Async.StartAsTask)) let getPackageDetailsBlock sources groupName packageName semVer = - use d = Profile.startCategory (Profile.Category.ResolverAlgorithmBlocked Profile.BlockReason.PackageDetails) - let result = (startRequestGetPackageDetails sources groupName packageName semVer).GetAwaiter().GetResult() - d.Dispose() - result - + let workHandle = startRequestGetPackageDetails sources groupName packageName semVer + getAndReport Profile.BlockReason.PackageDetails workHandle - let startedGetVersionsRequests = System.Collections.Concurrent.ConcurrentDictionary<_,System.Threading.Tasks.Task<_>>() + let startedGetVersionsRequests = System.Collections.Concurrent.ConcurrentDictionary<_,WorkHandle<_>>() let startRequestGetVersions sources groupName packageName = let key = (sources, packageName) startedGetVersionsRequests.GetOrAdd (key, fun _ -> - getVersionsRaw sources groupName packageName - |> Async.StartAsTask) + workerQueue + |> ResolverRequestQueue.addWork WorkPriority.BackgroundWork (fun () -> + getVersionsRaw sources groupName packageName + |> Async.StartAsTask)) let getVersionsBlock sources resolverStrategy groupName packageName = - use d = Profile.startCategory (Profile.Category.ResolverAlgorithmBlocked Profile.BlockReason.GetVersion) - let versions = (startRequestGetVersions sources groupName packageName).GetAwaiter().GetResult() |> Seq.toList - d.Dispose() + let workHandle = startRequestGetVersions sources groupName packageName + let versions = getAndReport Profile.BlockReason.GetVersion workHandle let sorted = match resolverStrategy with | ResolverStrategy.Max -> List.sortDescending versions @@ -879,11 +956,11 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou // Start pre-loading infos about dependencies. for (pack,verReq,restr) in exploredPackage.Dependencies do async { - let! versions = startRequestGetVersions currentRequirement.Sources groupName pack |> Async.AwaitTask + let! versions = (startRequestGetVersions currentRequirement.Sources groupName pack).Task |> Async.AwaitTask // Preload the first version in range of this requirement match versions |> Seq.map fst |> Seq.tryFind (verReq.IsInRange) with | Some verToPreload -> - let! details = startRequestGetPackageDetails currentRequirement.Sources groupName pack verToPreload |> Async.AwaitTask + let! details = (startRequestGetPackageDetails currentRequirement.Sources groupName pack verToPreload).Task |> Async.AwaitTask () | None -> () return () @@ -963,41 +1040,47 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou } let inline calculate () = step (Step((currentConflict,startingStep,currentRequirement),[])) stackpack Seq.empty flags + + try #if DEBUG - let mutable results = None - let mutable error = None - // Increase stack size, because we have no tail-call-elimination - let thread = new System.Threading.Thread((fun () -> - try - results <- Some (calculate()) - with e -> - // Prevent the application from crashing - error <- Some (System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture e) - ), 1024 * 1024 * 100) - thread.Name <- sprintf "Paket Resolver Thread (Debug) - %O" (System.Guid.NewGuid()) - thread.Start() - thread.Join() - match error with - | Some e -> e.Throw() - | _ -> () - let stepResult = - match results with - | Some s -> s - | None -> failwithf "Expected to get results from the resolver thread :/." + let mutable results = None + let mutable error = None + // Increase stack size, because we have no tail-call-elimination + let thread = new System.Threading.Thread((fun () -> + try + results <- Some (calculate()) + with e -> + // Prevent the application from crashing + error <- Some (System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture e) + ), 1024 * 1024 * 100) + thread.Name <- sprintf "Paket Resolver Thread (Debug) - %O" (System.Guid.NewGuid()) + thread.Start() + thread.Join() + match error with + | Some e -> e.Throw() + | _ -> () + let stepResult = + match results with + | Some s -> s + | None -> failwithf "Expected to get results from the resolver thread :/." #else - let stepResult = calculate() + let stepResult = calculate() #endif - match stepResult with - | { Status = Resolution.Conflict _ } as conflict -> - if conflict.TryRelaxed then - stackpack.KnownConflicts.Clear() - stackpack.ConflictHistory.Clear() - (step (Step((conflict - ,{startingStep with Relax=true} - ,currentRequirement),[])) - stackpack Seq.empty flags).Status - else - conflict.Status - | x -> x.Status - + match stepResult with + | { Status = Resolution.Conflict _ } as conflict -> + if conflict.TryRelaxed then + stackpack.KnownConflicts.Clear() + stackpack.ConflictHistory.Clear() + (step (Step((conflict + ,{startingStep with Relax=true} + ,currentRequirement),[])) + stackpack Seq.empty flags).Status + else + conflict.Status + | x -> x.Status + finally + // some cleanup + cts.Cancel() + for w in workers do + w.Wait() diff --git a/src/Paket/Program.fs b/src/Paket/Program.fs index 018f4b193e..f979238d42 100644 --- a/src/Paket/Program.fs +++ b/src/Paket/Program.fs @@ -64,26 +64,27 @@ let processWithValidation silent validateF commandF (result : ParseResults<'T>) match cat with | Profile.Category.ResolverAlgorithm -> 1 | Profile.Category.ResolverAlgorithmBlocked b -> 2 - | Profile.Category.FileIO -> 3 - | Profile.Category.NuGetDownload -> 4 - | Profile.Category.NuGetRequest -> 5 - | Profile.Category.Other -> 6) + | Profile.Category.ResolverAlgorithmNotBlocked b -> 3 + | Profile.Category.FileIO -> 4 + | Profile.Category.NuGetDownload -> 5 + | Profile.Category.NuGetRequest -> 6 + | Profile.Category.Other -> 7) |> List.iter (fun (cat, num, elapsed) -> + let reason b = + match b with + | Profile.BlockReason.PackageDetails -> "retrieving package details" + | Profile.BlockReason.GetVersion -> "retrieving package versions" match cat with | Profile.Category.ResolverAlgorithm -> tracefn " - Resolver: %s (%d runs)" (Utils.TimeSpanToReadableString elapsed) num let realTime = resolver - blocked tracefn " - Runtime: %s" (Utils.TimeSpanToReadableString realTime) - let blockNum = blockedRaw |> Seq.sumBy (fun (_, num, _) -> num) - let blockPaket4 = TimeSpan.FromMilliseconds(500.0 * float blockNum) - tracefn " - Runtime Paket 4 (estimated ~500ms respose*): %s" (Utils.TimeSpanToReadableString (realTime + blockPaket4)) - tracefn " * See http://stats.pingdom.com/aqicaf2upspo/1265300 for average response times." | Profile.Category.ResolverAlgorithmBlocked b -> - let reason = - match b with - | Profile.BlockReason.PackageDetails -> "retrieving package details" - | Profile.BlockReason.GetVersion -> "retrieving package versions" + let reason = reason b tracefn " - Blocked (%s): %s (%d times)" reason (Utils.TimeSpanToReadableString elapsed) num + | Profile.Category.ResolverAlgorithmNotBlocked b -> + let reason = reason b + tracefn " - Not Blocked (%s): %d times" reason num | Profile.Category.FileIO -> tracefn " - Disk IO: %s" (Utils.TimeSpanToReadableString elapsed) | Profile.Category.NuGetDownload -> From 03a85920bfb16f1d6cb0de1166e32fef88c9da0a Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 25 May 2017 19:33:10 +0200 Subject: [PATCH 02/15] fix compilation errors --- src/Paket.Core/Dependencies/NuGetV2.fs | 2 +- src/Paket.Core/Dependencies/PackageResolver.fs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Paket.Core/Dependencies/NuGetV2.fs b/src/Paket.Core/Dependencies/NuGetV2.fs index c45c4d9b88..934c2f07cb 100644 --- a/src/Paket.Core/Dependencies/NuGetV2.fs +++ b/src/Paket.Core/Dependencies/NuGetV2.fs @@ -747,7 +747,7 @@ let rec private getPackageDetails alternativeProjectRoot root force (sources:Pac for error in restError do if not verbose then // Otherwise the error was already mentioned above - traceWarnfn "Ignoring: %s" error.Message + traceWarnfn "Ignoring: %s" error.SourceException.Message h.Throw() return fallback() | Choice1Of2 packageDetails -> return packageDetails diff --git a/src/Paket.Core/Dependencies/PackageResolver.fs b/src/Paket.Core/Dependencies/PackageResolver.fs index fb2195e512..b12574d4b6 100644 --- a/src/Paket.Core/Dependencies/PackageResolver.fs +++ b/src/Paket.Core/Dependencies/PackageResolver.fs @@ -746,7 +746,7 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou |> Async.StartAsTask)) let getVersionsBlock sources resolverStrategy groupName packageName = let workHandle = startRequestGetVersions sources groupName packageName - let versions = getAndReport Profile.BlockReason.GetVersion workHandle + let versions = getAndReport Profile.BlockReason.GetVersion workHandle |> Seq.toList let sorted = match resolverStrategy with | ResolverStrategy.Max -> List.sortDescending versions From bba98107123c45ee9d71ffb8d5663b2f6c975074 Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 25 May 2017 19:58:06 +0200 Subject: [PATCH 03/15] semveral bugfixes in the new scheduler --- .../Dependencies/PackageResolver.fs | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/Paket.Core/Dependencies/PackageResolver.fs b/src/Paket.Core/Dependencies/PackageResolver.fs index b12574d4b6..f8cdb45214 100644 --- a/src/Paket.Core/Dependencies/PackageResolver.fs +++ b/src/Paket.Core/Dependencies/PackageResolver.fs @@ -662,8 +662,17 @@ module ResolverRequestQueue = let addWork prio (f: unit -> System.Threading.Tasks.Task<'a>) ({ DynamicQueue = queue } as q) = let tcs = new TaskCompletionSource<_>() let work = - { StartWork = (fun () -> - f().ContinueWith(fun (t:System.Threading.Tasks.Task<'a>) -> + { StartWork = (fun () -> + let t = + try + f() + with e -> + //Task.FromException (e) + let tcs = new TaskCompletionSource<_>() + tcs.SetException e + tcs.Task + + t.ContinueWith(fun (t:System.Threading.Tasks.Task<'a>) -> if t.IsCanceled then tcs.SetException(new TaskCanceledException(t)) elif t.IsFaulted then @@ -677,12 +686,21 @@ module ResolverRequestQueue = let rec private getNext ({ DynamicQueue = queue } as d) = if queue.Count = 0 then None else - let min = queue |> Seq.minBy (fun kv -> kv.Value.Priority) - match queue.TryRemove(min.Key) with - | true, min -> - Some min - | _ -> - getNext d + let min = + // cannot minBy as the sequence might be empty by now. + queue |> Seq.fold (fun currentMin kv -> + match currentMin with + | None -> Some (kv.Key, kv.Value) + | Some (minKey, min) when kv.Value.Priority < min.Priority -> Some (kv.Key, kv.Value) + | min -> min) None + match min with + | Some (minKey, min) -> + match queue.TryRemove(minKey) with + | true, min -> + Some min + | _ -> + getNext d + | None -> getNext d let startProcessing (cts:CancellationToken) ({ DynamicQueue = queue } as q) = async { while not cts.IsCancellationRequested do @@ -711,7 +729,7 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou let workers = // start maximal 7 requests at the same time. [ 0 .. 7 ] - |> Seq.map (fun _ -> ResolverRequestQueue.startProcessing cts.Token workerQueue) + |> List.map (fun _ -> ResolverRequestQueue.startProcessing cts.Token workerQueue) let getAndReport blockReason (workHandle:WorkHandle<_>) = if workHandle.Task.IsCompleted then @@ -720,6 +738,9 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou else workHandle.Reprioritize WorkPriority.BlockingWork use d = Profile.startCategory (Profile.Category.ResolverAlgorithmBlocked blockReason) + let isFinished = workHandle.Task.Wait(60000) + if not isFinished then + raise <| new TimeoutException("Waited 60 seconds for a request to finish, maybe a bug in the paket request scheduler.") let result = workHandle.Task.Result d.Dispose() result From 053508c06b17725e1bca1f06c834a21c03962bb2 Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Mon, 29 May 2017 18:27:23 +0200 Subject: [PATCH 04/15] improve the implementation to use locks and no more sleep. --- .../Dependencies/PackageResolver.fs | 82 +++++++++++-------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/src/Paket.Core/Dependencies/PackageResolver.fs b/src/Paket.Core/Dependencies/PackageResolver.fs index f8cdb45214..6e8ea8b7a3 100644 --- a/src/Paket.Core/Dependencies/PackageResolver.fs +++ b/src/Paket.Core/Dependencies/PackageResolver.fs @@ -649,23 +649,52 @@ type WorkPriority = type RequestWork = private - { StartWork : unit -> System.Threading.Tasks.Task + { StartWork : CancellationToken -> System.Threading.Tasks.Task mutable Priority : WorkPriority } -type WorkHandle<'a> = private { Handle : Guid; Work : RequestWork; TaskSource : TaskCompletionSource<'a> } +type WorkHandle<'a> = private { Work : RequestWork; TaskSource : TaskCompletionSource<'a> } and ResolverRequestQueue = - private { DynamicQueue : System.Collections.Concurrent.ConcurrentDictionary } + private { DynamicQueue : ResizeArray; Lock : obj; WaitingWorker : ResizeArray> } + // callback in a lock is bad practice.. + member private x.With callback = + lock x.Lock (fun () -> + callback x.DynamicQueue x.WaitingWorker + ) + member x.AddWork w = + x.With (fun queue workers -> + if workers.Count > 0 then + let worker = workers.[0] + workers.RemoveAt(0) + worker.SetResult (Some w) + else + queue.Add(w) + ) + member x.GetWork (ct:CancellationToken) = + let tcs = new TaskCompletionSource<_>() + let registration = ct.Register(fun () -> tcs.TrySetResult None |> ignore) + tcs.Task.ContinueWith (fun (t:Task) -> + registration.Dispose()) |> ignore + x.With (fun queue workers -> + if queue.Count = 0 then + workers.Add(tcs) + else + let (index, work) = queue |> Seq.mapi (fun i w -> i,w) |> Seq.minBy (fun (i,w) -> w.Priority) + queue.RemoveAt index + tcs.TrySetResult (Some work) |> ignore + tcs.Task + ) + module ResolverRequestQueue = open System.Threading - let Create() = { DynamicQueue = new System.Collections.Concurrent.ConcurrentDictionary(); } - let addWork prio (f: unit -> System.Threading.Tasks.Task<'a>) ({ DynamicQueue = queue } as q) = + let Create() = { DynamicQueue = new ResizeArray(); Lock = new obj(); WaitingWorker = new ResizeArray<_>() } + let addWork prio (f: CancellationToken -> System.Threading.Tasks.Task<'a>) ({ DynamicQueue = queue } as q) = let tcs = new TaskCompletionSource<_>() let work = - { StartWork = (fun () -> + { StartWork = (fun tok -> let t = try - f() + f tok with e -> //Task.FromException (e) let tcs = new TaskCompletionSource<_>() @@ -679,35 +708,16 @@ module ResolverRequestQueue = tcs.SetException(t.Exception) else tcs.SetResult (t.Result))) Priority = prio } - let handle = Guid.NewGuid() - let res = queue.TryAdd(handle, work) - assert res - { Handle = handle; Work = work; TaskSource = tcs } - let rec private getNext ({ DynamicQueue = queue } as d) = - if queue.Count = 0 then None - else - let min = - // cannot minBy as the sequence might be empty by now. - queue |> Seq.fold (fun currentMin kv -> - match currentMin with - | None -> Some (kv.Key, kv.Value) - | Some (minKey, min) when kv.Value.Priority < min.Priority -> Some (kv.Key, kv.Value) - | min -> min) None - match min with - | Some (minKey, min) -> - match queue.TryRemove(minKey) with - | true, min -> - Some min - | _ -> - getNext d - | None -> getNext d - let startProcessing (cts:CancellationToken) ({ DynamicQueue = queue } as q) = + q.AddWork work + { Work = work; TaskSource = tcs } + let startProcessing (ct:CancellationToken) ({ DynamicQueue = queue } as q) = async { - while not cts.IsCancellationRequested do - match getNext q with - | None -> do! Async.Sleep 10 + while not ct.IsCancellationRequested do + let! work = q.GetWork(ct) |> Async.AwaitTask + match work with | Some work -> - do! work.StartWork().ContinueWith(fun (t:System.Threading.Tasks.Task) ->()) |> Async.AwaitTask + do! work.StartWork(ct).ContinueWith(fun (t:System.Threading.Tasks.Task) -> ()) |> Async.AwaitTask + | None -> () } |> Async.StartAsTask @@ -750,7 +760,7 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou let key = (sources, packageName, semVer) startedGetPackageDetailsRequests.GetOrAdd (key, fun _ -> workerQueue - |> ResolverRequestQueue.addWork WorkPriority.BackgroundWork (fun () -> + |> ResolverRequestQueue.addWork WorkPriority.BackgroundWork (fun ct -> (getPackageDetailsRaw sources groupName packageName semVer : Async) |> Async.StartAsTask)) let getPackageDetailsBlock sources groupName packageName semVer = @@ -762,7 +772,7 @@ let Resolve (getVersionsRaw, getPreferredVersionsRaw, getPackageDetailsRaw, grou let key = (sources, packageName) startedGetVersionsRequests.GetOrAdd (key, fun _ -> workerQueue - |> ResolverRequestQueue.addWork WorkPriority.BackgroundWork (fun () -> + |> ResolverRequestQueue.addWork WorkPriority.BackgroundWork (fun ct -> getVersionsRaw sources groupName packageName |> Async.StartAsTask)) let getVersionsBlock sources resolverStrategy groupName packageName = From 90edcacde13e9830c3d96b8c05b434b6e718cd4f Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Mon, 29 May 2017 19:06:51 +0200 Subject: [PATCH 05/15] fix project files and improve error message, lower timeout. --- .../Paket.IntegrationTests.fsproj | 7 ++++++- src/Paket.Core/Dependencies/PackageResolver.fs | 15 ++++++++++----- src/Paket.Core/Paket.Core.fsproj | 7 ++++++- src/Paket/Paket.fsproj | 11 ++++++++--- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/integrationtests/Paket.IntegrationTests/Paket.IntegrationTests.fsproj b/integrationtests/Paket.IntegrationTests/Paket.IntegrationTests.fsproj index afba1f1cef..d00a900d0c 100644 --- a/integrationtests/Paket.IntegrationTests/Paket.IntegrationTests.fsproj +++ b/integrationtests/Paket.IntegrationTests/Paket.IntegrationTests.fsproj @@ -47,6 +47,11 @@ 11 + + + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v15.0\FSharp\Microsoft.FSharp.Targets + + $(MSBuildExtensionsPath32)\..\Microsoft SDKs\F#\4.1\Framework\v4.0\Microsoft.FSharp.Targets @@ -73,7 +78,7 @@ - +