diff --git a/src/Paket.Core/RemoteUpload.fs b/src/Paket.Core/RemoteUpload.fs index d52f8d8519..0097fca7a2 100644 --- a/src/Paket.Core/RemoteUpload.fs +++ b/src/Paket.Core/RemoteUpload.fs @@ -9,22 +9,46 @@ open Paket open Paket.Logging type System.Net.WebClient with - member x.UploadFileAsMultipart (url:Uri) filename = - let fileTemplate = "--{0}\r\nContent-Disposition: form-data; name=\"{1}\"; filename=\"{2}\"\r\nContent-Type: {3}\r\n\r\n" - let boundary = "---------------------------" + DateTime.Now.Ticks.ToString("x", CultureInfo.InvariantCulture) - let fileInfo = (new FileInfo(Path.GetFullPath(filename))) - let fileHeaderBytes = String.Format(CultureInfo.InvariantCulture, fileTemplate, boundary, "package", "package", "application/octet-stream") - |> Encoding.UTF8.GetBytes - let newlineBytes = Environment.NewLine |> Encoding.UTF8.GetBytes - let trailerbytes = String.Format(CultureInfo.InvariantCulture, "--{0}--", boundary) |> Encoding.UTF8.GetBytes - x.Headers.Add(HttpRequestHeader.ContentType, "multipart/form-data; boundary=" + boundary); - use stream = x.OpenWrite(url, "PUT") - stream.Write(fileHeaderBytes,0,fileHeaderBytes.Length) - use fileStream = File.OpenRead fileInfo.FullName - fileStream.CopyTo(stream, (4*1024)) - stream.Write(newlineBytes, 0, newlineBytes.Length) - stream.Write(trailerbytes, 0, trailerbytes.Length) - () + member x.UploadFileAsMultipartAsync (url:Uri) filename = + // event to report back completion of upload + let progressReport = new Event() + + let computation = async { + let fileTemplate = "--{0}\r\nContent-Disposition: form-data; name=\"{1}\"; filename=\"{2}\"\r\nContent-Type: {3}\r\n\r\n" + let boundary = "---------------------------" + DateTime.Now.Ticks.ToString("x", CultureInfo.InvariantCulture) + let fileInfo = (new FileInfo(Path.GetFullPath(filename))) + let fileHeaderBytes = String.Format(CultureInfo.InvariantCulture, fileTemplate, boundary, "package", "package", "application/octet-stream") + |> Encoding.UTF8.GetBytes + let newlineBytes = Environment.NewLine |> Encoding.UTF8.GetBytes + let trailerbytes = String.Format(CultureInfo.InvariantCulture, "--{0}--", boundary) |> Encoding.UTF8.GetBytes + x.Headers.Add(HttpRequestHeader.ContentType, "multipart/form-data; boundary=" + boundary); + use stream = x.OpenWrite(url, "PUT") + do! stream.AsyncWrite(fileHeaderBytes,0,fileHeaderBytes.Length) + use fileStream = File.OpenRead fileInfo.FullName + + // upload file content and report progress + do! + let fileSize = fileInfo.Length + let bufferSize = 4*1024 + let buffer = Array.zeroCreate bufferSize + + let rec upload bytesWritten = async { + let! count = fileStream.AsyncRead(buffer, 0 ,bufferSize) + if count > 0 then + if count < bufferSize then + do! stream.AsyncWrite(Array.sub buffer 0 count) + else + do! stream.AsyncWrite(buffer) + let bytesWritten = bytesWritten + (count |> int64) + do progressReport.Trigger(bytesWritten, fileSize) + return! upload bytesWritten + } + upload 0L + + do! stream.AsyncWrite(newlineBytes, 0, newlineBytes.Length) + do! stream.AsyncWrite(trailerbytes, 0, trailerbytes.Length) + } + computation, progressReport.Publish let GetUrlWithEndpoint (url: string option) (endPoint: string option) = let (|UrlWithEndpoint|_|) url = @@ -49,17 +73,42 @@ let GetUrlWithEndpoint (url: string option) (endPoint: string option) = let Push maxTrials url apiKey packageFileName = - let rec push trial = + let rec push trial = async { tracefn "Pushing package %s to %s - trial %d" packageFileName url trial try let client = Utils.createWebClient(url, None) client.Headers.Add("X-NuGet-ApiKey", apiKey) - client.UploadFileAsMultipart (new Uri(url)) packageFileName - |> ignore + let uploadAsync, progressReport = + client.UploadFileAsMultipartAsync (new Uri(url)) packageFileName + // report progress every second + let reportProgressInterval = 1000 + use progressSubscription = + progressReport + |> Observable.sample reportProgressInterval + |> Observable.pairwise + |> Observable.map + (fun ((bytesWrittenPrev,_), (bytesWritten, fileSize)) -> + let kb bytes = (bytes |> float) / 1024. + let bytesWrittenKb = bytesWritten |> kb + let fileSizeKb = fileSize |> kb + let uploadSpeed = + (bytesWrittenKb - (bytesWrittenPrev |> kb)) / + ((reportProgressInterval |> float) / 1000.) + let progressPercentage = (bytesWrittenKb / fileSizeKb) * 100. + progressPercentage, uploadSpeed, bytesWrittenKb, fileSizeKb) + |> Observable.subscribe + (fun (progressPercentage, uploadSpeed, bytesWrittenKb, fileSizeKb) -> + tracefn @"Pushing %s: %.0f of %.0f KB uploaded [%.0f %%] [%.0f KB/s]" + packageFileName + bytesWrittenKb + fileSizeKb + progressPercentage + uploadSpeed) + do! uploadAsync tracefn "Pushing %s complete." packageFileName with | exn when trial < maxTrials -> traceWarnfn "Could not push %s: %s" packageFileName exn.Message - push (trial + 1) - - push 1 \ No newline at end of file + return! push (trial + 1) + } + push 1 |> Async.RunSynchronously \ No newline at end of file diff --git a/src/Paket.Core/Utils.fs b/src/Paket.Core/Utils.fs index 90dbcf9d08..ecc71cebf7 100644 --- a/src/Paket.Core/Utils.fs +++ b/src/Paket.Core/Utils.fs @@ -328,4 +328,47 @@ let removeFile (fileName : string) = File.Delete(fileName) |> ok with _ -> FileDeleteError fileName |> fail - else ok () \ No newline at end of file + else ok () + +[] +module ObservableExtensions = + + let private synchronize f = + let ctx = System.Threading.SynchronizationContext.Current + f (fun g arg -> + let nctx = System.Threading.SynchronizationContext.Current + if ctx <> null && ctx <> nctx then + ctx.Post((fun _ -> g(arg)), null) + else + g(arg)) + + type Microsoft.FSharp.Control.Async with + static member AwaitObservable(ev1:IObservable<'a>) = + synchronize (fun f -> + Async.FromContinuations((fun (cont,econt,ccont) -> + let rec callback = (fun value -> + remover.Dispose() + f cont value ) + and remover : IDisposable = ev1.Subscribe(callback) + () ))) + + [] + module Observable = + let sample milliseconds source = + let relay (observer:IObserver<'T>) = + let rec loop () = async { + let! value = Async.AwaitObservable source + observer.OnNext value + do! Async.Sleep milliseconds + return! loop() + } + loop () + + { new IObservable<'T> with + member this.Subscribe(observer:IObserver<'T>) = + let cts = new System.Threading.CancellationTokenSource() + Async.StartImmediate(relay observer, cts.Token) + { new IDisposable with + member this.Dispose() = cts.Cancel() + } + }