From e958e7dfcb1128a72fa381e4ebd7bd772a25ba83 Mon Sep 17 00:00:00 2001 From: Isaac Abraham Date: Thu, 1 Jun 2017 14:14:35 +0200 Subject: [PATCH 1/5] Initial work on moving to WASB. --- src/MBrace.Azure/Store/BlobStore.fs | 154 ++++++++++------------------ 1 file changed, 56 insertions(+), 98 deletions(-) diff --git a/src/MBrace.Azure/Store/BlobStore.fs b/src/MBrace.Azure/Store/BlobStore.fs index 994031fb..070ec003 100644 --- a/src/MBrace.Azure/Store/BlobStore.fs +++ b/src/MBrace.Azure/Store/BlobStore.fs @@ -14,25 +14,24 @@ open MBrace.Azure.Runtime.Utilities [] module private BlobUtils = - - let rootPath = "/" - let rootPathAlt = @"\" - let delims = [|'/'; '\\'|] - - let isPathRooted (path : string) = - path.StartsWith rootPath || path.StartsWith rootPathAlt + let containerDelimiter = '@' - let normalizePath (path : string) = - let normalized = path.Split(delims, StringSplitOptions.RemoveEmptyEntries) |> String.concat "/" - "/" + normalized + let isWasbPath (path:string) = + path + |> Seq.filter ((=) containerDelimiter) + |> Seq.length = 1 - let ensureRooted (path : string) = - if isPathRooted path then path else raise <| FormatException(sprintf "Invalid path %A. Paths should start with '/' or '\\'." path) + let splitPath = + let folderDelimiters = [| '@'; '/'; '\\'|] + fun (path : string) -> + if isWasbPath path then path.Split(folderDelimiters, 2, StringSplitOptions.RemoveEmptyEntries) + else [| path |] /// Azure blob container. - type Container = - | Root - | Container of string + type Container = Container of string override this.ToString() = this |> function | Container x -> x + + type InvalidWasbPathException(path:string) = + inherit Exception(sprintf "Invalid store path '%s'. A valid store path should confirm to WASB standard e.g. container@folder/folder/file.txt" path) /// Represents a 'directory' in blob storage. type StoreDirectory = @@ -42,23 +41,23 @@ module private BlobUtils = } static member Parse(path : string) = - let path = ensureRooted path - let xs = path.Split(delims, 2, StringSplitOptions.RemoveEmptyEntries) - match xs with - | [||] -> { Container = Root; SubDirectory = None } + match splitPath path with | [|c|] -> Validate.containerName c { Container = Container c ; SubDirectory = None } - | [|c; x|] -> Validate.containerName c { Container = Container c; SubDirectory = Some (x + "/") } - - | _ -> raise <| new FormatException(sprintf "Invalid store path %A." path) + | _ -> raise <| InvalidWasbPathException path static member Validate(path : string) = ignore <| StoreDirectory.Parse path + override this.ToString() = + match this.SubDirectory with + | None -> string this.Container + | Some subDirectory -> sprintf "%O@%s" this.Container subDirectory + /// Represents a full path to a blob. type StorePath = { @@ -70,30 +69,28 @@ module private BlobUtils = ignore <| StorePath.Parse path static member Parse(path : string) = - let path = ensureRooted path - let xs = path.Split(delims, 2, StringSplitOptions.RemoveEmptyEntries) - match xs with - | [|x|] -> { Container = Root; BlobName = x } + match splitPath path with | [|c; x|] -> Validate.containerName c { Container = Container c; BlobName = x } + | _ -> raise <| InvalidWasbPathException path + + static member Create(container : string, path : string) = + Validate.containerName container + { Container = Container container; BlobName = path } - | _ -> raise <| new FormatException(sprintf "Invalid store path %A." path) + override this.ToString() = sprintf "%O@%s" this.Container this.BlobName /// /// Creates a blob storage container reference given account and container name /// /// Storage account instance. /// Container name - let getContainerReference (account : AzureStorageAccount) (container : Container) = async { + let getContainerReference (account : AzureStorageAccount) (Container container) = async { let client = account.BlobClient - match container with - | Root -> - let root = client.GetRootContainerReference() - do! root.CreateIfNotExistsAsyncSafe(maxRetries = 3) - return root - | Container c -> - return client.GetContainerReference c + let container = client.GetContainerReference container + do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) + return container } /// @@ -104,10 +101,6 @@ module private BlobUtils = let getBlobReference account (fullPath : string) = async { let path = StorePath.Parse fullPath let! container = getContainerReference account path.Container - match path.Container with - | Container _ -> do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) - | Root -> () - return container.GetBlockBlobReference(path.BlobName) } @@ -153,7 +146,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string let account = account [] - let defaultContainer = normalizePath defaultContainer + let defaultContainer = defaultContainer do StoreDirectory.Validate defaultContainer @@ -176,14 +169,12 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string interface ICloudFileStore with member this.BeginWrite(path: string): Async = async { - let path = normalizePath path let! blob = getBlobReference account path let! stream = blob.OpenWriteAsync() |> Async.AwaitTaskCorrect return stream :> Stream } member this.ReadETag(path: string, etag: ETag): Async = async { - let path = normalizePath path let! blob = getBlobReference account path let! stream = blob.OpenReadAsync(AccessCondition.GenerateIfMatchCondition(etag), BlobRequestOptions(), null) |> Async.AwaitTaskCorrect |> Async.Catch match stream with @@ -197,7 +188,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.TryGetETag(path: string): Async = async { - let path = normalizePath path let! blob = getBlobReference account path try do! blob.FetchAttributesAsync() |> Async.AwaitTaskCorrect @@ -215,24 +205,22 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.DefaultDirectory = defaultContainer member this.WithDefaultDirectory (newContainer : string) = - let newContainer = normalizePath newContainer new BlobStore(account, newContainer) :> _ - member this.RootDirectory = rootPath + member this.RootDirectory = defaultContainer member this.IsCaseSensitiveFileSystem = false - member this.GetRandomDirectoryName() : string = normalizePath <| Guid.NewGuid().ToString() + member this.GetRandomDirectoryName() : string = Guid.NewGuid().ToString() - member this.IsPathRooted(path : string) = isPathRooted path + member this.IsPathRooted(path : string) = isWasbPath path - member this.GetDirectoryName(path : string) = normalizePath <| Path.GetDirectoryName path + member this.GetDirectoryName(path : string) = Path.GetDirectoryName path member this.GetFileName(path : string) = Path.GetFileName(path) - member this.Combine(paths : string []) : string = normalizePath <| Path.Combine paths + member this.Combine(paths : string []) : string = Path.Combine paths member this.GetFileSize(path: string) : Async = async { - let path = normalizePath path let! blob = getBlobReference account path let! result = Async.Catch <| Async.AwaitTaskCorrect(blob.FetchAttributesAsync()) match result with @@ -243,7 +231,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.GetLastModifiedTime (path: string, isDirectory : bool) : Async = async { - let path = normalizePath path if isDirectory then let storeDir = StoreDirectory.Parse path let! containerRef = getContainerReference account storeDir.Container @@ -270,7 +257,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.FileExists(path: string) : Async = async { - let path = normalizePath path let storePath = StorePath.Parse path let! container = getContainerReference account storePath.Container @@ -284,15 +270,13 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.EnumerateFiles(container : string) : Async = async { try - let container = normalizePath container - let path = StoreDirectory.Parse container - let! containerRef = getContainerReference account path.Container - let! listedBlobs = listBlobItemsAsync containerRef path.SubDirectory + let storeDirectory = StoreDirectory.Parse container + let! containerRef = getContainerReference account storeDirectory.Container + let! listedBlobs = listBlobItemsAsync containerRef storeDirectory.SubDirectory return listedBlobs |> Seq.choose (function :? ICloudBlob as cb -> Some cb.Name | _ -> None) - |> Seq.map (fun b -> this.Combine(container, b)) - |> Seq.map normalizePath + |> Seq.map (fun b -> StorePath.Create(container, b) |> string) |> Seq.toArray with e when StoreException.NotFound e -> @@ -301,7 +285,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.DeleteFile(path: string) : Async = async { try - let path = normalizePath path let! blob = getBlobReference account path do! blob.DeleteAsync() |> Async.AwaitTaskCorrect with e when StoreException.NotFound e -> @@ -309,33 +292,21 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.DirectoryExists(container: string) : Async = async { - let container = normalizePath container let path = StoreDirectory.Parse container - match path.Container with - | Container _ as c -> - let! container = getContainerReference account c - return! container.ExistsAsync() |> Async.AwaitTaskCorrect - | Root -> - return true + let! container = getContainerReference account path.Container + return! container.ExistsAsync() |> Async.AwaitTaskCorrect } member this.CreateDirectory(container: string) : Async = async { - let container = normalizePath container let path = StoreDirectory.Parse container - match path.Container with - | Container _ as c -> - let! container = getContainerReference account c - do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) - return () - | Root -> - return () + let! container = getContainerReference account path.Container + do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) + return () } member this.DeleteDirectory(container: string, _recursiveDelete : bool) : Async = async { - let container = normalizePath container let path = StoreDirectory.Parse container match path with - | { Container = Root; SubDirectory = _ } -> return invalidArg "container" "Cannot delete the root container." | { Container = c; SubDirectory = None } -> let! container = getContainerReference account c let! _ = container.DeleteIfExistsAsync() |> Async.AwaitTaskCorrect @@ -346,7 +317,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string let! blobs = listSubdirBlobsAsync sub do! blobs |> Seq.map (fun b -> async { let p = b.Uri.Segments |> Array.last - let! blob = getBlobReference account (normalizePath <| Path.Combine(container.Name,p)) + let! blob = getBlobReference account (Path.Combine(container.Name,p)) let! _ = blob.DeleteIfExistsAsync() |> Async.AwaitTaskCorrect () }) @@ -356,28 +327,20 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.EnumerateDirectories(directory : string) : Async = async { try - let directory = normalizePath directory - let path = StoreDirectory.Parse directory - match path with - | { Container = Root; SubDirectory = _ } -> - let! containers = listContainersAsync account.BlobClient - return containers |> Seq.map (fun c -> normalizePath c.Name) |> Seq.toArray - - | { Container = (Container cnt as c) ; SubDirectory = subdir } -> - let! cref = getContainerReference account c - let! listedEntries = listBlobItemsAsync cref subdir - return - listedEntries - |> Seq.choose (function :? CloudBlobDirectory as d -> Some d | _ -> None) - |> Seq.map (fun d -> normalizePath <| Path.Combine(cnt, d.Prefix)) - |> Seq.toArray + let ({ Container = (Container cnt as c) ; SubDirectory = subdir }) = StoreDirectory.Parse directory + let! cref = getContainerReference account c + let! listedEntries = listBlobItemsAsync cref subdir + return + listedEntries + |> Seq.choose (function :? CloudBlobDirectory as d -> Some d | _ -> None) + |> Seq.map (fun d -> Path.Combine(cnt, d.Prefix)) + |> Seq.toArray with e when StoreException.NotFound e -> return raise <| new DirectoryNotFoundException(directory, e) } member this.WriteETag(path: string, writer : Stream -> Async<'R>) : Async = async { - let path = normalizePath path let! blob = getBlobReference account path // http://msdn.microsoft.com/en-us/library/azure/dd179431.aspx let! result = async { @@ -390,7 +353,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.BeginRead(path: string) : Async = async { try - let path = normalizePath path let! blob = getBlobReference account path return! blob.OpenReadAsync() |> Async.AwaitTaskCorrect with e when StoreException.NotFound e -> @@ -398,7 +360,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.UploadFromStream(path: string, source: Stream) : Async = async { - let path = normalizePath path let! blob = getBlobReference account path let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.UploadFromStreamAsync(source, null, options, OperationContext()).ContinueWith ignore |> Async.AwaitTaskCorrect @@ -406,7 +367,6 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.DownloadToStream(path: string, target: Stream) : Async = async { try - let path = normalizePath path let! blob = getBlobReference account path do! blob.DownloadToStreamAsync(target) |> Async.AwaitTaskCorrect with e when StoreException.NotFound e -> @@ -414,14 +374,12 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.UploadFromLocalFile(source : string, target : string) : Async = async { - let target = normalizePath target let! blob = getBlobReference account target let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.UploadFromFileAsync(source, null, options, OperationContext()) |> Async.AwaitTaskCorrect } member this.DownloadToLocalFile(source : string, target : string) : Async = async { - let source = normalizePath source let! blob = getBlobReference account source let! exists = blob.ExistsAsync() |> Async.AwaitTaskCorrect if not exists then raise <| new FileNotFoundException(source) From 3ba24e5f8f6f4810c0469fee1443026f58f19954 Mon Sep 17 00:00:00 2001 From: Isaac Abraham Date: Fri, 2 Jun 2017 17:39:04 +0200 Subject: [PATCH 2/5] More support for WASB across directories. --- src/MBrace.Azure/Store/BlobStore.fs | 30 +++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/MBrace.Azure/Store/BlobStore.fs b/src/MBrace.Azure/Store/BlobStore.fs index 070ec003..086d3601 100644 --- a/src/MBrace.Azure/Store/BlobStore.fs +++ b/src/MBrace.Azure/Store/BlobStore.fs @@ -53,6 +53,10 @@ module private BlobUtils = static member Validate(path : string) = ignore <| StoreDirectory.Parse path + static member Create(container : string, path : string option) = + Validate.containerName container + { Container = Container container; SubDirectory = path } + override this.ToString() = match this.SubDirectory with | None -> string this.Container @@ -89,20 +93,21 @@ module private BlobUtils = let getContainerReference (account : AzureStorageAccount) (Container container) = async { let client = account.BlobClient let container = client.GetContainerReference container - do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) return container } + let getBlobReferenceFull account fullPath ensureContainerExists = async { + let path = StorePath.Parse fullPath + let! container = getContainerReference account path.Container + if ensureContainerExists then do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) + return container.GetBlockBlobReference(path.BlobName) } + /// /// Creates a blob reference given account and full path. /// /// Cloud storage account. /// Path to blob. - let getBlobReference account (fullPath : string) = async { - let path = StorePath.Parse fullPath - let! container = getContainerReference account path.Container - return container.GetBlockBlobReference(path.BlobName) - } + let getBlobReference account (fullPath : string) = getBlobReferenceFull account fullPath false /// Asynchronously lists blob items for given container and prefix let listBlobItemsAsync (container : CloudBlobContainer) (prefix : string option) = async { @@ -169,7 +174,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string interface ICloudFileStore with member this.BeginWrite(path: string): Async = async { - let! blob = getBlobReference account path + let! blob = getBlobReferenceFull account path true let! stream = blob.OpenWriteAsync() |> Async.AwaitTaskCorrect return stream :> Stream } @@ -203,11 +208,11 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.Name = "MBrace.Azure.Store.BlobStore" member this.Id : string = account.CloudStorageAccount.BlobStorageUri.PrimaryUri.ToString() - member this.DefaultDirectory = defaultContainer + member this.DefaultDirectory = "" member this.WithDefaultDirectory (newContainer : string) = new BlobStore(account, newContainer) :> _ - member this.RootDirectory = defaultContainer + member this.RootDirectory = "" member this.IsCaseSensitiveFileSystem = false member this.GetRandomDirectoryName() : string = Guid.NewGuid().ToString() @@ -271,6 +276,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.EnumerateFiles(container : string) : Async = async { try let storeDirectory = StoreDirectory.Parse container + let (Container container) = storeDirectory.Container let! containerRef = getContainerReference account storeDirectory.Container let! listedBlobs = listBlobItemsAsync containerRef storeDirectory.SubDirectory return @@ -333,7 +339,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string return listedEntries |> Seq.choose (function :? CloudBlobDirectory as d -> Some d | _ -> None) - |> Seq.map (fun d -> Path.Combine(cnt, d.Prefix)) + |> Seq.map (fun d -> StoreDirectory.Create(cnt, Some d.Prefix) |> string) |> Seq.toArray with e when StoreException.NotFound e -> @@ -360,7 +366,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.UploadFromStream(path: string, source: Stream) : Async = async { - let! blob = getBlobReference account path + let! blob = getBlobReferenceFull account path true let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.UploadFromStreamAsync(source, null, options, OperationContext()).ContinueWith ignore |> Async.AwaitTaskCorrect } @@ -374,7 +380,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.UploadFromLocalFile(source : string, target : string) : Async = async { - let! blob = getBlobReference account target + let! blob = getBlobReferenceFull account target true let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.UploadFromFileAsync(source, null, options, OperationContext()) |> Async.AwaitTaskCorrect } From 746c8b706c1ae2d60e192526e945c110f2827775 Mon Sep 17 00:00:00 2001 From: Isaac Abraham Date: Thu, 8 Jun 2017 12:48:37 +0200 Subject: [PATCH 3/5] More work on WASB --- src/MBrace.Azure/Store/BlobStore.fs | 91 +++++++++++++++-------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/src/MBrace.Azure/Store/BlobStore.fs b/src/MBrace.Azure/Store/BlobStore.fs index 086d3601..bec2b137 100644 --- a/src/MBrace.Azure/Store/BlobStore.fs +++ b/src/MBrace.Azure/Store/BlobStore.fs @@ -22,10 +22,12 @@ module private BlobUtils = |> Seq.length = 1 let splitPath = - let folderDelimiters = [| '@'; '/'; '\\'|] fun (path : string) -> - if isWasbPath path then path.Split(folderDelimiters, 2, StringSplitOptions.RemoveEmptyEntries) - else [| path |] + if isWasbPath path then + match path.Split([| containerDelimiter |], 2) with + | [| container; blob |] -> [| container; blob.TrimStart '/' |] + | parts -> parts + else [| path.TrimStart '/' |] /// Azure blob container. type Container = Container of string override this.ToString() = this |> function | Container x -> x @@ -42,12 +44,12 @@ module private BlobUtils = static member Parse(path : string) = match splitPath path with - | [|c|] -> - Validate.containerName c - { Container = Container c ; SubDirectory = None } - | [|c; x|] -> - Validate.containerName c - { Container = Container c; SubDirectory = Some (x + "/") } + | [| container |] -> + Validate.containerName container + { Container = Container container ; SubDirectory = None } + | [| container; blob |] -> + Validate.containerName container + { Container = Container container; SubDirectory = Some (blob + "/") } | _ -> raise <| InvalidWasbPathException path static member Validate(path : string) = @@ -68,15 +70,13 @@ module private BlobUtils = Container : Container BlobName : string } - - static member Validate(path : string) = - ignore <| StorePath.Parse path - - static member Parse(path : string) = - match splitPath path with - | [|c; x|] -> - Validate.containerName c - { Container = Container c; BlobName = x } + static member Parse(path : string) defaultContainer = + match defaultContainer, splitPath path with + | _, [| container; blob |] -> + Validate.containerName container + { Container = Container container; BlobName = blob } + | Some defaultContainer, [| blob |] -> + { Container = Container defaultContainer; BlobName = blob } | _ -> raise <| InvalidWasbPathException path static member Create(container : string, path : string) = @@ -96,8 +96,8 @@ module private BlobUtils = return container } - let getBlobReferenceFull account fullPath ensureContainerExists = async { - let path = StorePath.Parse fullPath + let getBlobReferenceFull defaultContainer account path ensureContainerExists = async { + let path = StorePath.Parse path defaultContainer let! container = getContainerReference account path.Container if ensureContainerExists then do! container.CreateIfNotExistsAsyncSafe(maxRetries = 3) return container.GetBlockBlobReference(path.BlobName) } @@ -107,7 +107,7 @@ module private BlobUtils = /// /// Cloud storage account. /// Path to blob. - let getBlobReference account (fullPath : string) = getBlobReferenceFull account fullPath false + let getBlobReference defaultContainer account path = getBlobReferenceFull defaultContainer account path false /// Asynchronously lists blob items for given container and prefix let listBlobItemsAsync (container : CloudBlobContainer) (prefix : string option) = async { @@ -145,7 +145,7 @@ module private BlobUtils = /// MBrace File Store implementation that uses Azure Blob Storage as backend. [] -type BlobStore private (account : AzureStorageAccount, defaultContainer : string) = +type BlobStore private (account : AzureStorageAccount, defaultContainer : string option) = [] let account = account @@ -153,7 +153,9 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string [] let defaultContainer = defaultContainer - do StoreDirectory.Validate defaultContainer + let getBlobReferenceFull = getBlobReferenceFull defaultContainer + let getBlobReference = getBlobReference defaultContainer + do defaultContainer |> Option.iter StoreDirectory.Validate /// /// Creates an Azure blob store based CloudFileStore instance that connects to provided storage account. @@ -162,7 +164,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string /// Default container to be used be store instance. static member Create(account : AzureStorageAccount, ?defaultContainer : string) = ignore account.ConnectionString // force check that connection string is present in current host. - new BlobStore(account, defaultArg defaultContainer "/") + new BlobStore(account, defaultContainer) /// /// Creates an Azure blob store based CloudFileStore instance that connects to provided connection string. @@ -208,30 +210,32 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.Name = "MBrace.Azure.Store.BlobStore" member this.Id : string = account.CloudStorageAccount.BlobStorageUri.PrimaryUri.ToString() - member this.DefaultDirectory = "" + member this.DefaultDirectory = defaultContainer |> defaultArg <| "" member this.WithDefaultDirectory (newContainer : string) = - new BlobStore(account, newContainer) :> _ + new BlobStore(account, Some newContainer) :> _ member this.RootDirectory = "" member this.IsCaseSensitiveFileSystem = false member this.GetRandomDirectoryName() : string = Guid.NewGuid().ToString() - member this.IsPathRooted(path : string) = isWasbPath path + member this.IsPathRooted(path : string) = isWasbPath path || (not(Path.HasExtension path) && not (path.Contains "/")) member this.GetDirectoryName(path : string) = Path.GetDirectoryName path member this.GetFileName(path : string) = Path.GetFileName(path) - member this.Combine(paths : string []) : string = Path.Combine paths + member this.Combine(paths : string []) : string = + StorePath.Parse (Path.Combine paths.[1..]) (Some paths.[0]) + |> string member this.GetFileSize(path: string) : Async = async { let! blob = getBlobReference account path let! result = Async.Catch <| Async.AwaitTaskCorrect(blob.FetchAttributesAsync()) match result with - | Choice1Of2 () when blob.Properties.Length = -1L -> return! Async.Raise <| FileNotFoundException(path) + | Choice1Of2 () when blob.Properties.Length = -1L -> return! Async.Raise <| FileNotFoundException path | Choice1Of2 () -> return blob.Properties.Length - | Choice2Of2 ex when StoreException.NotFound ex -> return! Async.Raise <| FileNotFoundException(path) + | Choice2Of2 ex when StoreException.NotFound ex -> return! Async.Raise <| FileNotFoundException path | Choice2Of2 ex -> return! Async.Raise ex } @@ -244,7 +248,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string | Choice1Of2 () -> let lm = containerRef.Properties.LastModified if lm.HasValue then return lm.Value - else return! Async.Raise <| DirectoryNotFoundException(path) + else return! Async.Raise <| DirectoryNotFoundException path | Choice2Of2 ex when StoreException.NotFound ex -> return! Async.Raise <| DirectoryNotFoundException(path) | Choice2Of2 ex -> return! Async.Raise ex @@ -255,19 +259,19 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string | Choice1Of2 () -> let lm = blob.Properties.LastModified if lm.HasValue then return lm.Value - else return! Async.Raise <| FileNotFoundException(path) + else return! Async.Raise <| FileNotFoundException path - | Choice2Of2 ex when StoreException.NotFound ex -> return! Async.Raise <| FileNotFoundException(path) + | Choice2Of2 ex when StoreException.NotFound ex -> return! Async.Raise <| FileNotFoundException path | Choice2Of2 ex -> return! Async.Raise ex } member this.FileExists(path: string) : Async = async { - let storePath = StorePath.Parse path - let! container = getContainerReference account storePath.Container + let path = StorePath.Parse path defaultContainer + let! container = getContainerReference account path.Container - let! b1 = container.ExistsAsync() |> Async.AwaitTaskCorrect - if b1 then - let! blob = getBlobReference account path + let! containerExists = container.ExistsAsync() |> Async.AwaitTaskCorrect + if containerExists then + let! blob = getBlobReference account (string path) return! blob.ExistsAsync() |> Async.AwaitTaskCorrect else return false @@ -321,12 +325,13 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string let! container = getContainerReference account c let sub = container.GetDirectoryReference(s) let! blobs = listSubdirBlobsAsync sub - do! blobs |> Seq.map (fun b -> async { + do! + blobs + |> Seq.map (fun b -> async { let p = b.Uri.Segments |> Array.last - let! blob = getBlobReference account (Path.Combine(container.Name,p)) + let! blob = getBlobReference account (StorePath.Create(container.Name, p).ToString()) let! _ = blob.DeleteIfExistsAsync() |> Async.AwaitTaskCorrect - () - }) + () }) |> Async.Parallel |> Async.Ignore } @@ -388,7 +393,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.DownloadToLocalFile(source : string, target : string) : Async = async { let! blob = getBlobReference account source let! exists = blob.ExistsAsync() |> Async.AwaitTaskCorrect - if not exists then raise <| new FileNotFoundException(source) + if not exists then raise <| FileNotFoundException source let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.DownloadToFileAsync(target, FileMode.Create, null, options, OperationContext()) |> Async.AwaitTaskCorrect } \ No newline at end of file From d70b640aa909eab7506c872ec39a7fc679e2c79e Mon Sep 17 00:00:00 2001 From: Isaac Abraham Date: Fri, 9 Jun 2017 08:34:16 +0200 Subject: [PATCH 4/5] Remove default user data directory. --- src/MBrace.Azure/Configuration/Arguments.fs | 4 ---- src/MBrace.Azure/Configuration/Configuration.fs | 6 ------ src/MBrace.Azure/Runtime/Client.fs | 9 ++++----- src/MBrace.Azure/Runtime/ClusterId.fs | 5 ----- src/MBrace.Azure/Runtime/ClusterManager.fs | 10 ++-------- src/MBrace.Azure/Store/BlobStore.fs | 12 +++++++++--- tests/MBrace.Azure.Tests/Utils.fs | 4 ++-- 7 files changed, 17 insertions(+), 33 deletions(-) diff --git a/src/MBrace.Azure/Configuration/Arguments.fs b/src/MBrace.Azure/Configuration/Arguments.fs index 78d92bd2..b0663580 100644 --- a/src/MBrace.Azure/Configuration/Arguments.fs +++ b/src/MBrace.Azure/Configuration/Arguments.fs @@ -34,7 +34,6 @@ type private AzureArguments = | Runtime_Topic of topic_name:string // Blob Storage | Runtime_Container of container:string - | User_Data_Container of container:string | Assembly_Container of container:string | Cloud_Value_Container of container:string // Table Storage @@ -63,7 +62,6 @@ type private AzureArguments = | Runtime_Queue _ -> "Specifies the work item queue name in the ServiceBus." | Runtime_Topic _ -> "Specifies the work item topic name in the ServiceBus." | Runtime_Container _ -> "Specifies the blob container name used for persisting MBrace cluster data." - | User_Data_Container _ -> "Specifies the blob container name used for persisting MBrace user data." | Assembly_Container _ -> "Specifies the blob container name used for persisting Assembly dependencies." | Cloud_Value_Container _ -> "Specifies the blob container name used for persisting CloudValue dependencies." | Runtime_Table _ -> "Specifies the table name used for writing MBrace cluster entries." @@ -128,7 +126,6 @@ type ArgumentConfiguration = yield Runtime_Topic config.WorkItemTopic yield Runtime_Container config.RuntimeContainer - yield User_Data_Container config.UserDataContainer yield Assembly_Container config.AssemblyContainer yield Cloud_Value_Container config.CloudValueContainer @@ -166,7 +163,6 @@ type ArgumentConfiguration = parseResult.IterResult(<@ Runtime_Topic @>, fun t -> config.WorkItemTopic <- t) parseResult.IterResult(<@ Runtime_Container @>, fun c -> config.RuntimeContainer <- c) - parseResult.IterResult(<@ User_Data_Container @>, fun c -> config.UserDataContainer <- c) parseResult.IterResult(<@ Assembly_Container @>, fun c -> config.AssemblyContainer <- c) parseResult.IterResult(<@ Cloud_Value_Container @>, fun c -> config.CloudValueContainer <- c) diff --git a/src/MBrace.Azure/Configuration/Configuration.fs b/src/MBrace.Azure/Configuration/Configuration.fs index cd9494c7..3617503d 100644 --- a/src/MBrace.Azure/Configuration/Configuration.fs +++ b/src/MBrace.Azure/Configuration/Configuration.fs @@ -44,7 +44,6 @@ type Configuration(storageConnectionString : string, serviceBusConnectionString // Default Blob Storage Containers let mutable runtimeContainer = "mbraceruntimedata" - let mutable userDataContainer = "mbraceuserdata" let mutable cloudValueContainer = "mbracecloudvalue" let mutable assemblyContainer = "mbraceassemblies" @@ -109,11 +108,6 @@ type Configuration(storageConnectionString : string, serviceBusConnectionString with get () = runtimeContainer and set rc = Validate.containerName rc ; runtimeContainer <- rc - /// Azure Storage container used for user data. - member __.UserDataContainer - with get () = userDataContainer - and set udc = Validate.containerName udc ; userDataContainer <- udc - /// Azure Storage container used for Vagabond assembly dependencies. member __.AssemblyContainer with get () = assemblyContainer diff --git a/src/MBrace.Azure/Runtime/Client.fs b/src/MBrace.Azure/Runtime/Client.fs index 2c6ff6e6..a607ffce 100644 --- a/src/MBrace.Azure/Runtime/Client.fs +++ b/src/MBrace.Azure/Runtime/Client.fs @@ -182,11 +182,10 @@ type AzureCluster private (manager : ClusterManager, faultPolicy : FaultPolicy o /// Ignore active workers. Defaults to false. /// Reactivate configuration. Defaults to true. [] - member this.ResetAsync([]?deleteQueues : bool, []?deleteRuntimeState : bool, []?deleteLogs : bool, - []?deleteUserData : bool, []?deleteAssemblyData : bool, []?force : bool, []?reactivate : bool) : Async = + member this.ResetAsync([]?deleteQueues : bool, []?deleteRuntimeState : bool, []?deleteLogs : bool, []?deleteAssemblyData : bool, []?force : bool, []?reactivate : bool) : Async = manager.ResetCluster(?deleteQueues = deleteQueues, ?deleteRuntimeState = deleteRuntimeState, ?deleteLogs = deleteLogs, - ?deleteUserData = deleteUserData, ?deleteAssemblyData = deleteAssemblyData, + ?deleteAssemblyData = deleteAssemblyData, ?force = force, ?reactivate = reactivate) /// @@ -202,11 +201,11 @@ type AzureCluster private (manager : ClusterManager, faultPolicy : FaultPolicy o /// Ignore active workers. Defaults to false. /// Reactivate configuration. Defaults to true. [] - member this.Reset([]?deleteQueues : bool, []?deleteRuntimeState : bool, []?deleteLogs : bool, []?deleteUserData : bool, + member this.Reset([]?deleteQueues : bool, []?deleteRuntimeState : bool, []?deleteLogs : bool, []?deleteAssemblyData : bool, []?force : bool, []?reactivate : bool) : unit = this.ResetAsync(?deleteQueues = deleteQueues, ?deleteRuntimeState = deleteRuntimeState, ?deleteLogs = deleteLogs, - ?deleteUserData = deleteUserData, ?deleteAssemblyData = deleteAssemblyData, + ?deleteAssemblyData = deleteAssemblyData, ?force = force, ?reactivate = reactivate) |> Async.RunSync diff --git a/src/MBrace.Azure/Runtime/ClusterId.fs b/src/MBrace.Azure/Runtime/ClusterId.fs index 8d35d0a5..f2317754 100644 --- a/src/MBrace.Azure/Runtime/ClusterId.fs +++ b/src/MBrace.Azure/Runtime/ClusterId.fs @@ -36,8 +36,6 @@ type ClusterId = /// Runtime blob container. RuntimeContainer : string - /// User data container. - UserDataContainer : string /// Vagabond Assembly Container. VagabondContainer : string /// CloudValue Persist Container. @@ -74,7 +72,6 @@ type ClusterId = do! [| this.DeleteTable this.UserDataTable - this.DeleteContainer this.UserDataContainer this.DeleteContainer this.CloudValueContainer |] |> Async.Parallel @@ -124,7 +121,6 @@ type ClusterId = createTable this.RuntimeLogsTable createContainer this.RuntimeContainer - createContainer this.UserDataContainer createContainer this.CloudValueContainer createContainer this.VagabondContainer |] @@ -160,7 +156,6 @@ type ClusterId = RuntimeContainer = appendVersionAndSuffixId configuration.RuntimeContainer VagabondContainer = configuration.AssemblyContainer CloudValueContainer = appendVersionAndSuffixId configuration.CloudValueContainer - UserDataContainer = appendSuffixId configuration.UserDataContainer RuntimeTable = appendVersionAndSuffixId configuration.RuntimeTable RuntimeLogsTable = appendVersionAndSuffixId configuration.RuntimeLogsTable diff --git a/src/MBrace.Azure/Runtime/ClusterManager.fs b/src/MBrace.Azure/Runtime/ClusterManager.fs index 9fa80818..a08e7c84 100644 --- a/src/MBrace.Azure/Runtime/ClusterManager.fs +++ b/src/MBrace.Azure/Runtime/ClusterManager.fs @@ -53,13 +53,11 @@ type ClusterManager = member r.InitTopicMonitor(?currentWorker : IWorkerId) = TopicMonitor.Create(r.WorkerManager, r.WorkQueue, r.Logger, ?currentWorker = currentWorker) /// Resets the cluster store state with supplied parameters - member r.ResetCluster(?deleteQueues : bool, ?deleteRuntimeState : bool, ?deleteLogs : bool, ?deleteUserData : bool, - ?deleteAssemblyData : bool, ?force : bool, ?reactivate : bool) = async { + member r.ResetCluster(?deleteQueues : bool, ?deleteRuntimeState : bool, ?deleteLogs : bool, ?deleteAssemblyData : bool, ?force : bool, ?reactivate : bool) = async { let deleteQueues = defaultArg deleteQueues true let deleteRuntimeState = defaultArg deleteRuntimeState true let deleteLogs = defaultArg deleteLogs true - let deleteUserData = defaultArg deleteUserData false let deleteAssemblyData = defaultArg deleteAssemblyData false let force = defaultArg force false let reactivate = defaultArg reactivate true @@ -86,10 +84,6 @@ type ClusterManager = logger.LogWarningf "Deleting system log Table %A." clusterId.RuntimeLogsTable do! clusterId.ClearRuntimeLogs() - if deleteUserData then - logger.LogWarningf "Deleting UserData Container %A and Table %A." clusterId.UserDataContainer clusterId.UserDataTable - do! clusterId.ClearUserData() - if deleteAssemblyData then logger.LogWarningf "Deleting Vagabond Container %A." clusterId.VagabondContainer do! clusterId.ClearVagabondData() @@ -121,7 +115,7 @@ type ClusterManager = do! clusterId.InitializeAllStoreResources(maxRetries = 20, retryInterval = 3000) logger.LogInfof "Creating MBrace storage primitives" - let fileStore = BlobStore.Create(clusterId.StorageAccount, defaultContainer = clusterId.UserDataContainer) + let fileStore = BlobStore.Create(clusterId.StorageAccount) let atomProvider = TableAtomProvider.Create(clusterId.StorageAccount, defaultTable = clusterId.UserDataTable) let dictionaryProvider = TableDictionaryProvider.Create(clusterId.StorageAccount) let queueProvider = ServiceBusQueueProvider.Create(clusterId.ServiceBusAccount) diff --git a/src/MBrace.Azure/Store/BlobStore.fs b/src/MBrace.Azure/Store/BlobStore.fs index bec2b137..5575de07 100644 --- a/src/MBrace.Azure/Store/BlobStore.fs +++ b/src/MBrace.Azure/Store/BlobStore.fs @@ -219,15 +219,21 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.GetRandomDirectoryName() : string = Guid.NewGuid().ToString() - member this.IsPathRooted(path : string) = isWasbPath path || (not(Path.HasExtension path) && not (path.Contains "/")) + member this.IsPathRooted(path : string) = isWasbPath path member this.GetDirectoryName(path : string) = Path.GetDirectoryName path member this.GetFileName(path : string) = Path.GetFileName(path) member this.Combine(paths : string []) : string = - StorePath.Parse (Path.Combine paths.[1..]) (Some paths.[0]) - |> string + let (|ContainerPath|FullPath|) paths = + match paths with + | [| ""; folder |] -> ContainerPath folder + | _ -> FullPath (paths.[0], paths.[1..]) + + match paths with + | ContainerPath folder -> StoreDirectory.Parse folder |> string + | FullPath (container, blob) -> StorePath.Parse (Path.Combine blob) (Some container) |> string member this.GetFileSize(path: string) : Async = async { let! blob = getBlobReference account path diff --git a/tests/MBrace.Azure.Tests/Utils.fs b/tests/MBrace.Azure.Tests/Utils.fs index 30edc41c..1d038012 100644 --- a/tests/MBrace.Azure.Tests/Utils.fs +++ b/tests/MBrace.Azure.Tests/Utils.fs @@ -40,7 +40,7 @@ type ClusterSession(config : MBrace.Azure.Configuration, localWorkerCount : int, | None -> let cluster = AzureCluster.Connect(config, logger = ConsoleLogger(), logLevel = LogLevel.Debug) if localWorkerCount > 0 then - cluster.Reset(force = false, deleteUserData = true, deleteAssemblyData = true, reactivate = true) + cluster.Reset(force = false, deleteAssemblyData = true, reactivate = true) attachWorkers cluster while cluster.Workers.Length < localWorkerCount do Thread.Sleep 100 @@ -53,7 +53,7 @@ type ClusterSession(config : MBrace.Azure.Configuration, localWorkerCount : int, | Some r -> if localWorkerCount > 0 then r.KillAllLocalWorkers() - r.Reset(deleteUserData = true, deleteAssemblyData = true, force = true, reactivate = false) + r.Reset(deleteAssemblyData = true, force = true, reactivate = false) state <- None) From 4a51e6b979ee6c0df0edc8c9eb9f3bab4512b6f0 Mon Sep 17 00:00:00 2001 From: Isaac Abraham Date: Fri, 9 Jun 2017 11:37:04 +0200 Subject: [PATCH 5/5] Replace let-bound functions in BlobStore with private getters (serialization [] issues?) --- src/MBrace.Azure/Store/BlobStore.fs | 34 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/MBrace.Azure/Store/BlobStore.fs b/src/MBrace.Azure/Store/BlobStore.fs index 5575de07..5ae0024b 100644 --- a/src/MBrace.Azure/Store/BlobStore.fs +++ b/src/MBrace.Azure/Store/BlobStore.fs @@ -146,15 +146,12 @@ module private BlobUtils = /// MBrace File Store implementation that uses Azure Blob Storage as backend. [] type BlobStore private (account : AzureStorageAccount, defaultContainer : string option) = - [] let account = account [] let defaultContainer = defaultContainer - let getBlobReferenceFull = getBlobReferenceFull defaultContainer - let getBlobReference = getBlobReference defaultContainer do defaultContainer |> Option.iter StoreDirectory.Validate /// @@ -174,15 +171,18 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string static member Create(connectionString : string, ?defaultContainer : string) = BlobStore.Create(AzureStorageAccount.FromConnectionString connectionString, ?defaultContainer = defaultContainer) + member private __.GetBlobReferenceFull with get() = getBlobReferenceFull defaultContainer + member private __.GetBlobReference with get() = getBlobReference defaultContainer + interface ICloudFileStore with member this.BeginWrite(path: string): Async = async { - let! blob = getBlobReferenceFull account path true + let! blob = this.GetBlobReferenceFull account path true let! stream = blob.OpenWriteAsync() |> Async.AwaitTaskCorrect return stream :> Stream } member this.ReadETag(path: string, etag: ETag): Async = async { - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path let! stream = blob.OpenReadAsync(AccessCondition.GenerateIfMatchCondition(etag), BlobRequestOptions(), null) |> Async.AwaitTaskCorrect |> Async.Catch match stream with | Choice1Of2 s -> @@ -195,7 +195,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.TryGetETag(path: string): Async = async { - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path try do! blob.FetchAttributesAsync() |> Async.AwaitTaskCorrect if String.IsNullOrEmpty blob.Properties.ETag then @@ -236,7 +236,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string | FullPath (container, blob) -> StorePath.Parse (Path.Combine blob) (Some container) |> string member this.GetFileSize(path: string) : Async = async { - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path let! result = Async.Catch <| Async.AwaitTaskCorrect(blob.FetchAttributesAsync()) match result with | Choice1Of2 () when blob.Properties.Length = -1L -> return! Async.Raise <| FileNotFoundException path @@ -259,7 +259,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string | Choice2Of2 ex when StoreException.NotFound ex -> return! Async.Raise <| DirectoryNotFoundException(path) | Choice2Of2 ex -> return! Async.Raise ex else - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path let! result = Async.Catch <| Async.AwaitTaskCorrect(blob.FetchAttributesAsync()) match result with | Choice1Of2 () -> @@ -277,7 +277,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string let! containerExists = container.ExistsAsync() |> Async.AwaitTaskCorrect if containerExists then - let! blob = getBlobReference account (string path) + let! blob = this.GetBlobReference account (string path) return! blob.ExistsAsync() |> Async.AwaitTaskCorrect else return false @@ -301,7 +301,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.DeleteFile(path: string) : Async = async { try - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path do! blob.DeleteAsync() |> Async.AwaitTaskCorrect with e when StoreException.NotFound e -> return () @@ -335,7 +335,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string blobs |> Seq.map (fun b -> async { let p = b.Uri.Segments |> Array.last - let! blob = getBlobReference account (StorePath.Create(container.Name, p).ToString()) + let! blob = this.GetBlobReference account (StorePath.Create(container.Name, p).ToString()) let! _ = blob.DeleteIfExistsAsync() |> Async.AwaitTaskCorrect () }) |> Async.Parallel @@ -358,7 +358,7 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string } member this.WriteETag(path: string, writer : Stream -> Async<'R>) : Async = async { - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path // http://msdn.microsoft.com/en-us/library/azure/dd179431.aspx let! result = async { let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) @@ -370,34 +370,34 @@ type BlobStore private (account : AzureStorageAccount, defaultContainer : string member this.BeginRead(path: string) : Async = async { try - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path return! blob.OpenReadAsync() |> Async.AwaitTaskCorrect with e when StoreException.NotFound e -> return raise <| new FileNotFoundException(path, e) } member this.UploadFromStream(path: string, source: Stream) : Async = async { - let! blob = getBlobReferenceFull account path true + let! blob = this.GetBlobReferenceFull account path true let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.UploadFromStreamAsync(source, null, options, OperationContext()).ContinueWith ignore |> Async.AwaitTaskCorrect } member this.DownloadToStream(path: string, target: Stream) : Async = async { try - let! blob = getBlobReference account path + let! blob = this.GetBlobReference account path do! blob.DownloadToStreamAsync(target) |> Async.AwaitTaskCorrect with e when StoreException.NotFound e -> return raise <| new FileNotFoundException(path, e) } member this.UploadFromLocalFile(source : string, target : string) : Async = async { - let! blob = getBlobReferenceFull account target true + let! blob = this.GetBlobReferenceFull account target true let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.))) do! blob.UploadFromFileAsync(source, null, options, OperationContext()) |> Async.AwaitTaskCorrect } member this.DownloadToLocalFile(source : string, target : string) : Async = async { - let! blob = getBlobReference account source + let! blob = this.GetBlobReference account source let! exists = blob.ExistsAsync() |> Async.AwaitTaskCorrect if not exists then raise <| FileNotFoundException source let options = BlobRequestOptions(ServerTimeout = Nullable<_>(TimeSpan.FromMinutes(40.)))