Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

show upload progress for paket push #695

Merged
merged 7 commits into from Mar 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions src/Paket.Core/RemoteUpload.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,46 @@ open Paket
open Paket.Logging

type System.Net.WebClient with
member x.UploadFileAsMultipart (url:Uri) filename =
let fileTemplate = "--{0}\r\nContent-Disposition: form-data; name=\"{1}\"; filename=\"{2}\"\r\nContent-Type: {3}\r\n\r\n"
let boundary = "---------------------------" + DateTime.Now.Ticks.ToString("x", CultureInfo.InvariantCulture)
let fileInfo = (new FileInfo(Path.GetFullPath(filename)))
let fileHeaderBytes = String.Format(CultureInfo.InvariantCulture, fileTemplate, boundary, "package", "package", "application/octet-stream")
|> Encoding.UTF8.GetBytes
let newlineBytes = Environment.NewLine |> Encoding.UTF8.GetBytes
let trailerbytes = String.Format(CultureInfo.InvariantCulture, "--{0}--", boundary) |> Encoding.UTF8.GetBytes
x.Headers.Add(HttpRequestHeader.ContentType, "multipart/form-data; boundary=" + boundary);
use stream = x.OpenWrite(url, "PUT")
stream.Write(fileHeaderBytes,0,fileHeaderBytes.Length)
use fileStream = File.OpenRead fileInfo.FullName
fileStream.CopyTo(stream, (4*1024))
stream.Write(newlineBytes, 0, newlineBytes.Length)
stream.Write(trailerbytes, 0, trailerbytes.Length)
()
member x.UploadFileAsMultipartAsync (url:Uri) filename =
// event to report back completion of upload
let progressReport = new Event<int64 * int64>()

let computation = async {
let fileTemplate = "--{0}\r\nContent-Disposition: form-data; name=\"{1}\"; filename=\"{2}\"\r\nContent-Type: {3}\r\n\r\n"
let boundary = "---------------------------" + DateTime.Now.Ticks.ToString("x", CultureInfo.InvariantCulture)
let fileInfo = (new FileInfo(Path.GetFullPath(filename)))
let fileHeaderBytes = String.Format(CultureInfo.InvariantCulture, fileTemplate, boundary, "package", "package", "application/octet-stream")
|> Encoding.UTF8.GetBytes
let newlineBytes = Environment.NewLine |> Encoding.UTF8.GetBytes
let trailerbytes = String.Format(CultureInfo.InvariantCulture, "--{0}--", boundary) |> Encoding.UTF8.GetBytes
x.Headers.Add(HttpRequestHeader.ContentType, "multipart/form-data; boundary=" + boundary);
use stream = x.OpenWrite(url, "PUT")
do! stream.AsyncWrite(fileHeaderBytes,0,fileHeaderBytes.Length)
use fileStream = File.OpenRead fileInfo.FullName

// upload file content and report progress
do!
let fileSize = fileInfo.Length
let bufferSize = 4*1024
let buffer = Array.zeroCreate bufferSize

let rec upload bytesWritten = async {
let! count = fileStream.AsyncRead(buffer, 0 ,bufferSize)
if count > 0 then
if count < bufferSize then
do! stream.AsyncWrite(Array.sub buffer 0 count)
else
do! stream.AsyncWrite(buffer)
let bytesWritten = bytesWritten + (count |> int64)
do progressReport.Trigger(bytesWritten, fileSize)
return! upload bytesWritten
}
upload 0L

do! stream.AsyncWrite(newlineBytes, 0, newlineBytes.Length)
do! stream.AsyncWrite(trailerbytes, 0, trailerbytes.Length)
}
computation, progressReport.Publish

let GetUrlWithEndpoint (url: string option) (endPoint: string option) =
let (|UrlWithEndpoint|_|) url =
Expand All @@ -49,17 +73,42 @@ let GetUrlWithEndpoint (url: string option) (endPoint: string option) =


let Push maxTrials url apiKey packageFileName =
let rec push trial =
let rec push trial = async {
tracefn "Pushing package %s to %s - trial %d" packageFileName url trial
try
let client = Utils.createWebClient(url, None)
client.Headers.Add("X-NuGet-ApiKey", apiKey)
client.UploadFileAsMultipart (new Uri(url)) packageFileName
|> ignore
let uploadAsync, progressReport =
client.UploadFileAsMultipartAsync (new Uri(url)) packageFileName
// report progress every second
let reportProgressInterval = 1000
use progressSubscription =
progressReport
|> Observable.sample reportProgressInterval
|> Observable.pairwise
|> Observable.map
(fun ((bytesWrittenPrev,_), (bytesWritten, fileSize)) ->
let kb bytes = (bytes |> float) / 1024.
let bytesWrittenKb = bytesWritten |> kb
let fileSizeKb = fileSize |> kb
let uploadSpeed =
(bytesWrittenKb - (bytesWrittenPrev |> kb)) /
((reportProgressInterval |> float) / 1000.)
let progressPercentage = (bytesWrittenKb / fileSizeKb) * 100.
progressPercentage, uploadSpeed, bytesWrittenKb, fileSizeKb)
|> Observable.subscribe
(fun (progressPercentage, uploadSpeed, bytesWrittenKb, fileSizeKb) ->
tracefn @"Pushing %s: %.0f of %.0f KB uploaded [%.0f %%] [%.0f KB/s]"
packageFileName
bytesWrittenKb
fileSizeKb
progressPercentage
uploadSpeed)
do! uploadAsync
tracefn "Pushing %s complete." packageFileName
with
| exn when trial < maxTrials ->
traceWarnfn "Could not push %s: %s" packageFileName exn.Message
push (trial + 1)

push 1
return! push (trial + 1)
}
push 1 |> Async.RunSynchronously
45 changes: 44 additions & 1 deletion src/Paket.Core/Utils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,47 @@ let removeFile (fileName : string) =
File.Delete(fileName) |> ok
with _ ->
FileDeleteError fileName |> fail
else ok ()
else ok ()

[<AutoOpen>]
module ObservableExtensions =

let private synchronize f =
let ctx = System.Threading.SynchronizationContext.Current
f (fun g arg ->
let nctx = System.Threading.SynchronizationContext.Current
if ctx <> null && ctx <> nctx then
ctx.Post((fun _ -> g(arg)), null)
else
g(arg))

type Microsoft.FSharp.Control.Async with
static member AwaitObservable(ev1:IObservable<'a>) =
synchronize (fun f ->
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec callback = (fun value ->
remover.Dispose()
f cont value )
and remover : IDisposable = ev1.Subscribe(callback)
() )))

[<RequireQualifiedAccess>]
module Observable =
let sample milliseconds source =
let relay (observer:IObserver<'T>) =
let rec loop () = async {
let! value = Async.AwaitObservable source
observer.OnNext value
do! Async.Sleep milliseconds
return! loop()
}
loop ()

{ new IObservable<'T> with
member this.Subscribe(observer:IObserver<'T>) =
let cts = new System.Threading.CancellationTokenSource()
Async.StartImmediate(relay observer, cts.Token)
{ new IDisposable with
member this.Dispose() = cts.Cancel()
}
}